2024-11-06 06:00:15 +00:00
|
|
|
from plesna.compute.consume_flux import consume_flux
|
2025-01-05 14:43:29 +00:00
|
|
|
from plesna.models.flux import Flux, Transformation
|
2024-11-06 06:00:15 +00:00
|
|
|
from plesna.models.storage import Table
|
|
|
|
|
|
|
|
|
|
|
|
def test_consume_flux():
|
2025-01-05 14:31:40 +00:00
|
|
|
sources = [
|
|
|
|
Table(id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
|
|
|
|
Table(id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
|
|
|
|
]
|
|
|
|
targets = [
|
|
|
|
Table(id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]),
|
|
|
|
Table(id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]),
|
|
|
|
]
|
2024-11-06 06:00:15 +00:00
|
|
|
|
|
|
|
def func(sources, targets, **kwrds):
|
|
|
|
return {
|
|
|
|
"sources": len(sources),
|
|
|
|
"targets": len(targets),
|
|
|
|
"kwrds": len(kwrds),
|
|
|
|
}
|
|
|
|
|
|
|
|
flux = Flux(
|
2025-01-05 14:50:51 +00:00
|
|
|
id="flux",
|
|
|
|
name="flux",
|
2024-11-06 06:00:15 +00:00
|
|
|
sources=sources,
|
|
|
|
targets=targets,
|
2025-01-05 14:50:51 +00:00
|
|
|
transformation=Transformation(function=func, extra_kwrds={"extra": "super"}),
|
2024-11-06 06:00:15 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
meta = consume_flux(flux)
|
|
|
|
assert meta.data == {
|
|
|
|
"sources": 2,
|
|
|
|
"targets": 2,
|
|
|
|
"kwrds": 1,
|
|
|
|
}
|