diff --git a/plesna/dataplatform.py b/plesna/dataplatform.py index 7daa365..e34fac6 100644 --- a/plesna/dataplatform.py +++ b/plesna/dataplatform.py @@ -32,24 +32,28 @@ class DataPlateform: def repository(self, id: str) -> DataRepository: return self._repositories[id] - def add_flux(self, name: str, flux: Flux) -> str: - if name in self._fluxes: - raise DataPlateformError("The flux {name} already exists") + def is_valid_flux(self, flux: Flux) -> bool: + return True - self._fluxes[name] = flux - return name + def add_flux(self, flux: Flux) -> str: + if flux.id in self._fluxes: + raise DataPlateformError("The flux {flux} already exists") + + assert self.is_valid_flux(flux) + self._fluxes[flux.id] = flux + return flux.id @property def fluxes(self) -> list[str]: return list(self._fluxes) - def flux(self, name: str) -> Flux: - return self._fluxes[name] + def flux(self, flux_id: str) -> Flux: + return self._fluxes[flux_id] - def execute_flux(self, name: str) -> FluxMetaData: - if name not in self._fluxes: - raise DataPlateformError("The flux {name} is not registered") - return consume_flux(self._fluxes[name]) + def execute_flux(self, flux_id: str) -> FluxMetaData: + if flux_id not in self._fluxes: + raise DataPlateformError("The flux {flux_id} is not registered") + return consume_flux(self._fluxes[flux_id]) def graphset( self, diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index f92a443..fce1f6d 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -84,17 +84,17 @@ def foo_flux(repository: FSDataRepository) -> Flux: return flux -def test_add_flux(repository: FSDataRepository, copy_flux: Flux): +def test_add_flux(repository: FSDataRepository, copy_flux: Flux, foo_flux: Flux): dataplatform = DataPlateform() dataplatform.add_repository(repository) - dataplatform.add_flux(name="copy_flux", flux=copy_flux) + dataplatform.add_flux(flux=copy_flux) assert dataplatform.fluxes == ["copy_flux"] - dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux) - assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"] + dataplatform.add_flux(flux=foo_flux) + assert dataplatform.fluxes == ["copy_flux", "foo_flux"] assert dataplatform.flux("copy_flux") == copy_flux - assert dataplatform.flux("copy_flux_bis") == copy_flux + assert dataplatform.flux("foo_flux") == foo_flux @pytest.fixture @@ -107,8 +107,8 @@ def dataplatform( dp.add_repository(repository) - dp.add_flux("foo", foo_flux) - dp.add_flux("raw_brz_copy_username", copy_flux) + dp.add_flux(foo_flux) + dp.add_flux(copy_flux) return dp @@ -269,12 +269,12 @@ def test_content_from_graph_arguments(dataplatform: DataPlateform): def test_execute_flux(dataplatform: DataPlateform): - meta = dataplatform.execute_flux("foo") + meta = dataplatform.execute_flux("foo_flux") assert meta.data == {"who": "foo"} assert dataplatform.repository("test").schema("test-bronze").tables == [] - meta = dataplatform.execute_flux("raw_brz_copy_username") + meta = dataplatform.execute_flux("copy_flux") assert meta.data == {"src_size": 283, "tgt_size": 283} assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]