2024-11-06 06:00:15 +00:00
|
|
|
from plesna.compute.consume_flux import consume_flux
|
|
|
|
from plesna.models.flux import Flux
|
|
|
|
from plesna.models.storage import Table
|
|
|
|
from plesna.models.transformation import Transformation
|
|
|
|
|
|
|
|
|
|
|
|
def test_consume_flux():
|
|
|
|
sources = {
|
2025-01-03 14:56:29 +00:00
|
|
|
"src1": Table(
|
|
|
|
id="src1", repo_id="test", schema_id="test", name="test", value="here"
|
|
|
|
),
|
|
|
|
"src2": Table(
|
|
|
|
id="src2", repo_id="test", schema_id="test", name="test", value="here"
|
|
|
|
),
|
2024-11-06 06:00:15 +00:00
|
|
|
}
|
|
|
|
targets = {
|
2025-01-03 14:56:29 +00:00
|
|
|
"tgt1": Table(
|
|
|
|
id="tgt1", repo_id="test", schema_id="test", name="test", value="this"
|
|
|
|
),
|
|
|
|
"tgt2": Table(
|
|
|
|
id="tgt2", repo_id="test", schema_id="test", name="test", value="that"
|
|
|
|
),
|
2024-11-06 06:00:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
def func(sources, targets, **kwrds):
|
|
|
|
return {
|
|
|
|
"sources": len(sources),
|
|
|
|
"targets": len(targets),
|
|
|
|
"kwrds": len(kwrds),
|
|
|
|
}
|
|
|
|
|
|
|
|
flux = Flux(
|
|
|
|
sources=sources,
|
|
|
|
targets=targets,
|
|
|
|
transformation=Transformation(function=func, extra_kwrds={"extra": "super"}),
|
|
|
|
)
|
|
|
|
|
|
|
|
meta = consume_flux(flux)
|
|
|
|
assert meta.data == {
|
|
|
|
"sources": 2,
|
|
|
|
"targets": 2,
|
|
|
|
"kwrds": 1,
|
|
|
|
}
|