From d04bfe1d4444a0bc08e58d3155e2182abdf6187e Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Sat, 4 Jan 2025 15:30:32 +0100 Subject: [PATCH] Feat: add execute_flux --- plesna/dataplatform.py | 8 +++- tests/dataplatform/test_dataplateform.py | 60 ++++++++++++++++++------ 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/plesna/dataplatform.py b/plesna/dataplatform.py index 9c889bf..8964890 100644 --- a/plesna/dataplatform.py +++ b/plesna/dataplatform.py @@ -1,5 +1,6 @@ +from plesna.compute.consume_flux import consume_flux from plesna.graph.graph_set import GraphSet -from plesna.models.flux import Flux +from plesna.models.flux import Flux, FluxMetaData from plesna.storage.repository.repository import Repository @@ -41,3 +42,8 @@ class DataPlateform: def flux(self, name: str) -> Flux: return self._fluxes[name] + + def execute_flux(self, name: str) -> FluxMetaData: + if name not in self._fluxes: + raise DataPlateformError("The flux {name} is not registered") + return consume_flux(self._fluxes[name]) diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index 2ed9f0a..69e8ca5 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -54,21 +54,21 @@ def test_add_repository( @pytest.fixture -def dataplatform( - repository: FSRepository, -) -> DataPlateform: - dp = DataPlateform() - dp.add_repository("test", repository) - return dp +def foo_flux(repository: FSRepository) -> Flux: + src = {"username": repository.table("raw", "username")} + targets = {"username": repository.table("bronze", "username")} + def foo(sources, targets): + return {"who": "foo"} -def test_listing_content(dataplatform: DataPlateform): - assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"] - assert dataplatform.repository("test").schema("raw").tables == [ - "recovery", - "username", - "salary", - ] + extra_kwrds = {} + + flux = Flux( + sources=src, + targets=targets, + transformation=Transformation(function=foo, extra_kwrds=extra_kwrds), + ) + return flux @pytest.fixture @@ -89,7 +89,10 @@ def copy_flux(repository: FSRepository) -> Flux: return flux -def test_add_flux(dataplatform: DataPlateform, copy_flux: Flux): +def test_add_flux(repository: FSRepository, copy_flux: Flux): + dataplatform = DataPlateform() + dataplatform.add_repository("test", repository) + dataplatform.add_flux(name="copy_flux", flux=copy_flux) assert dataplatform.fluxes == ["copy_flux"] dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux) @@ -97,3 +100,32 @@ def test_add_flux(dataplatform: DataPlateform, copy_flux: Flux): assert dataplatform.flux("copy_flux") == copy_flux assert dataplatform.flux("copy_flux_bis") == copy_flux + + +@pytest.fixture +def dataplatform( + repository: FSRepository, + foo_flux: Flux, + copy_flux: Flux, +) -> DataPlateform: + dp = DataPlateform() + + dp.add_repository("test", repository) + + dp.add_flux("foo", foo_flux) + dp.add_flux("copy", copy_flux) + return dp + + +def test_listing_content(dataplatform: DataPlateform): + assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"] + assert dataplatform.repository("test").schema("raw").tables == [ + "recovery", + "username", + "salary", + ] + + +def test_execute_flux(dataplatform: DataPlateform): + meta = dataplatform.execute_flux("foo") + assert meta.data == {"who": "foo"}