From 1446c166cadd88a60b4de436e0e2af74ee43b2de Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Sat, 4 Jan 2025 13:51:24 +0100 Subject: [PATCH] Feat: add flux in dataplatform --- plesna/dataplatform.py | 20 +++++- plesna/storage/fake_datacatalogue.py | 81 ------------------------ tests/dataplatform/test_dataplateform.py | 31 ++++++++- 3 files changed, 46 insertions(+), 86 deletions(-) delete mode 100644 plesna/storage/fake_datacatalogue.py diff --git a/plesna/dataplatform.py b/plesna/dataplatform.py index c3c17db..9c889bf 100644 --- a/plesna/dataplatform.py +++ b/plesna/dataplatform.py @@ -1,4 +1,5 @@ from plesna.graph.graph_set import GraphSet +from plesna.models.flux import Flux from plesna.storage.repository.repository import Repository @@ -10,14 +11,15 @@ class DataPlateform: def __init__(self): self._graphset = GraphSet() self._metadata_engine = "" - self._transformations = {} + self._fluxes = {} self._repositories = {} - def add_repository(self, name: str, repository: Repository): + def add_repository(self, name: str, repository: Repository) -> str: if name in self._repositories: raise DataPlateformError("The repository {name} already exists") self._repositories[name] = repository + return name @property def repositories(self) -> list[str]: @@ -25,3 +27,17 @@ class DataPlateform: def repository(self, name: str) -> Repository: return self._repositories[name] + + def add_flux(self, name: str, flux: Flux) -> str: + if name in self._fluxes: + raise DataPlateformError("The flux {name} already exists") + + self._fluxes[name] = flux + return name + + @property + def fluxes(self) -> list[str]: + return list(self._fluxes) + + def flux(self, name: str) -> Flux: + return self._fluxes[name] diff --git a/plesna/storage/fake_datacatalogue.py b/plesna/storage/fake_datacatalogue.py deleted file mode 100644 index 11c53b9..0000000 --- a/plesna/storage/fake_datacatalogue.py +++ /dev/null @@ -1,81 +0,0 @@ -from pathlib import Path - -from pydantic import BaseModel, computed_field - -from plesna.models.storage import Schema, Table - -from .datacatalogue import DataCatalogue - - -class FakeSchema(BaseModel): - name: str - - @computed_field - @property - def ref(self) -> Schema: - return Schema( - id=str(self.name), - value=str(self.name), - ) - - -class FakeTable(BaseModel): - name: str - data: dict[str, list] - - @computed_field - @property - def ref(self) -> Table: - return Table( - id=str(self.name), - value=str(self.name), - ) - - -class FakeDataCatalogue(DataCatalogue): - """DataCatalogue based on dictionnaries""" - - def __init__(self, name: str): - self.name = name - - def ls( - self, dir="", only_files=False, only_directories=False, recursive=False - ) -> list[str]: - dirpath = self._basepath / dir - - if only_files: - return [ - str(f.relative_to(dirpath)) - for f in dirpath.iterdir() - if not f.is_dir() and not str(f).startswith(".") - ] - - if only_directories: - if recursive: - return [ - str(f[0].relative_to(dirpath)) - for f in dirpath.walk() - if not str(f).startswith(".") - ] - - return [ - str(f.relative_to(dirpath)) - for f in dirpath.iterdir() - if f.is_dir() and not str(f).startswith(".") - ] - - return [ - str(f.relative_to(dirpath)) - for f in dirpath.iterdir() - if not str(f).startswith(".") - ] - - def schemas(self) -> dict[str, FSSchema]: - """List schemas (sub directories within basepath)""" - subdirectories = self.ls("", only_directories=True, recursive=True) - return {str(path): FSSchema(path=path) for path in subdirectories} - - def tables(self, schema_id=".") -> dict[str, FSTable]: - """List table in schema (which are files in the directory)""" - schema_path = schema_id - return {path: FSTable(path=path) for path in self.ls(schema_path, only_files=True)} diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index bb378e4..2ed9f0a 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -4,6 +4,8 @@ from pathlib import Path import pytest from plesna.dataplatform import DataPlateform +from plesna.models.flux import Flux +from plesna.models.transformation import Transformation from plesna.storage.repository.fs_repository import FSRepository FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") @@ -69,6 +71,29 @@ def test_listing_content(dataplatform: DataPlateform): ] -def test_add_flux(dataplatform: DataPlateform): - # dataplatform.add_flux() - pass +@pytest.fixture +def copy_flux(repository: FSRepository) -> Flux: + src = {"username": repository.table("raw", "username")} + targets = {"username": repository.table("bronze", "username")} + + def copy(sources, targets): + pass + + extra_kwrds = {} + + flux = Flux( + sources=src, + targets=targets, + transformation=Transformation(function=copy, extra_kwrds=extra_kwrds), + ) + return flux + + +def test_add_flux(dataplatform: DataPlateform, copy_flux: Flux): + dataplatform.add_flux(name="copy_flux", flux=copy_flux) + assert dataplatform.fluxes == ["copy_flux"] + dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux) + assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"] + + assert dataplatform.flux("copy_flux") == copy_flux + assert dataplatform.flux("copy_flux_bis") == copy_flux