from plesna.compute.consume_flux import consume_flux from plesna.models.flux import Flux from plesna.models.storage import Table from plesna.models.transformation import Transformation def test_consume_flux(): sources = { "src1": Table( id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"] ), "src2": Table( id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"] ), } targets = { "tgt1": Table( id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"] ), "tgt2": Table( id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"] ), } def func(sources, targets, **kwrds): return { "sources": len(sources), "targets": len(targets), "kwrds": len(kwrds), } flux = Flux( sources=sources, targets=targets, transformation=Transformation(function=func, extra_kwrds={"extra": "super"}), ) meta = consume_flux(flux) assert meta.data == { "sources": 2, "targets": 2, "kwrds": 1, }