plesna/tests/compute/test_consume_flux.py

36 lines
1.1 KiB
Python
Raw Normal View History

from plesna.compute.consume_flux import consume_flux
from plesna.models.flux import Flux
from plesna.models.storage import Table
from plesna.models.transformation import Transformation
def test_consume_flux():
sources = [
Table(id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
Table(id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
]
targets = [
Table(id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]),
Table(id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]),
]
def func(sources, targets, **kwrds):
return {
"sources": len(sources),
"targets": len(targets),
"kwrds": len(kwrds),
}
flux = Flux(
sources=sources,
targets=targets,
transformation=Transformation(function=func, extra_kwrds={"extra": "super"}),
)
meta = consume_flux(flux)
assert meta.data == {
"sources": 2,
"targets": 2,
"kwrds": 1,
}