From ae61fd3c12361dd9d2849d46fa4b9c86e31086b9 Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Sun, 5 Jan 2025 14:55:46 +0100 Subject: [PATCH] refact: use repository id in dataplatform --- plesna/dataplatform.py | 35 ++++++++++--- plesna/graph/graph_set.py | 1 - plesna/storage/repository/fs_repository.py | 7 ++- plesna/storage/repository/repository.py | 13 ++++- tests/dataplatform/test_dataplateform.py | 60 +++++++++++++--------- tests/storage/test_fs_repository.py | 4 +- 6 files changed, 80 insertions(+), 40 deletions(-) diff --git a/plesna/dataplatform.py b/plesna/dataplatform.py index 8964890..00eb193 100644 --- a/plesna/dataplatform.py +++ b/plesna/dataplatform.py @@ -1,4 +1,5 @@ from plesna.compute.consume_flux import consume_flux +from plesna.graph.graph import Graph, Node from plesna.graph.graph_set import GraphSet from plesna.models.flux import Flux, FluxMetaData from plesna.storage.repository.repository import Repository @@ -11,23 +12,38 @@ class DataPlateformError(Exception): class DataPlateform: def __init__(self): self._graphset = GraphSet() + self._graph = Graph() self._metadata_engine = "" self._fluxes = {} self._repositories = {} - def add_repository(self, name: str, repository: Repository) -> str: - if name in self._repositories: - raise DataPlateformError("The repository {name} already exists") + @property + def graphset(self) -> GraphSet: + return self._graphset - self._repositories[name] = repository - return name + @property + def graph(self) -> Graph: + return self._graph + + def repository_graph_feed(self, repository_id: str): + for schema in self._repositories[repository_id].schemas(): + for table in self._repositories[repository_id].tables(schema): + self._graph.add_node(Node(name=table)) + + def add_repository(self, repository: Repository) -> str: + if repository.id in self._repositories: + raise DataPlateformError("The repository {repository.id} already exists") + + self._repositories[repository.id] = repository + self.repository_graph_feed(repository.id) + return repository.id @property def repositories(self) -> list[str]: return list(self._repositories) - def repository(self, name: str) -> Repository: - return self._repositories[name] + def repository(self, id: str) -> Repository: + return self._repositories[id] def add_flux(self, name: str, flux: Flux) -> str: if name in self._fluxes: @@ -36,6 +52,11 @@ class DataPlateform: self._fluxes[name] = flux return name + def flux_graphset_feed(self, flux_name: str): + for schema in self._repositories[repository_name].schemas(): + for table in self._repositories[repository_name].tables(schema): + self._graph.add_node(Node(name=table)) + @property def fluxes(self) -> list[str]: return list(self._fluxes) diff --git a/plesna/graph/graph_set.py b/plesna/graph/graph_set.py index e03d654..f79f1cd 100644 --- a/plesna/graph/graph_set.py +++ b/plesna/graph/graph_set.py @@ -5,7 +5,6 @@ from pydantic import BaseModel class Node(BaseModel): name: str - infos: dict = {} def __hash__(self): return hash(self.name) diff --git a/plesna/storage/repository/fs_repository.py b/plesna/storage/repository/fs_repository.py index 1ada1cd..7b463b2 100644 --- a/plesna/storage/repository/fs_repository.py +++ b/plesna/storage/repository/fs_repository.py @@ -86,11 +86,10 @@ class FSRepository(Repository): "table": "{repo_name}-{schema_name}-{table_name}", } - def __init__(self, name: str, basepath: str, id: str): - self._basepath = Path(basepath) - self.name = name - self.id = id + def __init__(self, id: str, name: str, basepath: str): + super().__init__(id, name) + self._basepath = Path(basepath) assert self._basepath.exists() def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]: diff --git a/plesna/storage/repository/repository.py b/plesna/storage/repository/repository.py index e679781..ba3b10c 100644 --- a/plesna/storage/repository/repository.py +++ b/plesna/storage/repository/repository.py @@ -4,8 +4,17 @@ from plesna.models.storage import Partition, Schema, Table class Repository: - def __init__(self): - pass + def __init__(self, id: str, name: str): + self._id = id + self._name = name + + @property + def id(self) -> str: + return self._id + + @property + def name(self) -> str: + return self._name @abc.abstractmethod def schemas(self) -> list[str]: diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index 678f38b..0867eef 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -4,6 +4,7 @@ from pathlib import Path import pytest from plesna.dataplatform import DataPlateform +from plesna.graph.graph import Node from plesna.models.flux import Flux from plesna.models.transformation import Transformation from plesna.storage.repository.fs_repository import FSRepository @@ -24,38 +25,20 @@ def repository(tmp_path) -> FSRepository: silver_path = Path(tmp_path) / "silver" silver_path.mkdir() - return FSRepository("test", tmp_path, "test") + return FSRepository("test", "test", tmp_path) def test_add_repository( repository: FSRepository, ): dp = DataPlateform() - dp.add_repository("test", repository) + dp.add_repository(repository) assert dp.repositories == ["test"] assert dp.repository("test") == repository -@pytest.fixture -def foo_flux(repository: FSRepository) -> Flux: - src = {"username": repository.table("test-raw-username")} - targets = {"username": repository.table("test-bronze-username")} - - def foo(sources, targets): - return {"who": "foo"} - - extra_kwrds = {} - - flux = Flux( - sources=src, - targets=targets, - transformation=Transformation(function=foo, extra_kwrds=extra_kwrds), - ) - return flux - - @pytest.fixture def copy_flux(repository: FSRepository) -> Flux: raw_username = {"username": repository.table("test-raw-username")} @@ -77,9 +60,30 @@ def copy_flux(repository: FSRepository) -> Flux: return raw_brz_copy_username +@pytest.fixture +def foo_flux(repository: FSRepository) -> Flux: + src = { + "username": repository.table("test-raw-username"), + "recovery": repository.table("test-raw-recovery"), + } + targets = {"username_foo": repository.table("test-bronze-foo")} + + def foo(sources, targets): + return {"who": "foo"} + + extra_kwrds = {} + + flux = Flux( + sources=src, + targets=targets, + transformation=Transformation(function=foo, extra_kwrds=extra_kwrds), + ) + return flux + + def test_add_flux(repository: FSRepository, copy_flux: Flux): dataplatform = DataPlateform() - dataplatform.add_repository("test", repository) + dataplatform.add_repository(repository) dataplatform.add_flux(name="copy_flux", flux=copy_flux) assert dataplatform.fluxes == ["copy_flux"] @@ -98,7 +102,7 @@ def dataplatform( ) -> DataPlateform: dp = DataPlateform() - dp.add_repository("test", repository) + dp.add_repository(repository) dp.add_flux("foo", foo_flux) dp.add_flux("raw_brz_copy_username", copy_flux) @@ -121,8 +125,16 @@ def test_listing_content(dataplatform: DataPlateform): def test_content_from_graph(dataplatform: DataPlateform): - # assert dataplatform.graphset.model_dump() == {} - pass + assert dataplatform.graph.nodes == { + Node(name="test-raw-recovery", infos={}), + Node(name="test-raw-salary", infos={}), + Node(name="test-raw-username", infos={}), + } + + # assert dataplatform.graphset.node_sets == { + # Node(name="test-raw-username", infos={}), + # Node(name="test-bronze-username", infos={}), + # } def test_execute_flux(dataplatform: DataPlateform): diff --git a/tests/storage/test_fs_repository.py b/tests/storage/test_fs_repository.py index b6feabc..a40aa95 100644 --- a/tests/storage/test_fs_repository.py +++ b/tests/storage/test_fs_repository.py @@ -20,7 +20,7 @@ def location(tmp_path): def test_init(location): - repo = FSRepository("example", location, "example") + repo = FSRepository("example", "example", location) assert repo.ls() == [ "schema", ] @@ -45,7 +45,7 @@ def test_init(location): @pytest.fixture def repository(location) -> FSRepository: - return FSRepository("example", location, "example") + return FSRepository("example", "example", location) def test_list_schemas(repository):