refact: use repository id in dataplatform

This commit is contained in:
Bertrand Benjamin 2025-01-05 14:55:46 +01:00
parent d256fbf169
commit ae61fd3c12
6 changed files with 80 additions and 40 deletions

View File

@ -1,4 +1,5 @@
from plesna.compute.consume_flux import consume_flux from plesna.compute.consume_flux import consume_flux
from plesna.graph.graph import Graph, Node
from plesna.graph.graph_set import GraphSet from plesna.graph.graph_set import GraphSet
from plesna.models.flux import Flux, FluxMetaData from plesna.models.flux import Flux, FluxMetaData
from plesna.storage.repository.repository import Repository from plesna.storage.repository.repository import Repository
@ -11,23 +12,38 @@ class DataPlateformError(Exception):
class DataPlateform: class DataPlateform:
def __init__(self): def __init__(self):
self._graphset = GraphSet() self._graphset = GraphSet()
self._graph = Graph()
self._metadata_engine = "" self._metadata_engine = ""
self._fluxes = {} self._fluxes = {}
self._repositories = {} self._repositories = {}
def add_repository(self, name: str, repository: Repository) -> str: @property
if name in self._repositories: def graphset(self) -> GraphSet:
raise DataPlateformError("The repository {name} already exists") return self._graphset
self._repositories[name] = repository @property
return name def graph(self) -> Graph:
return self._graph
def repository_graph_feed(self, repository_id: str):
for schema in self._repositories[repository_id].schemas():
for table in self._repositories[repository_id].tables(schema):
self._graph.add_node(Node(name=table))
def add_repository(self, repository: Repository) -> str:
if repository.id in self._repositories:
raise DataPlateformError("The repository {repository.id} already exists")
self._repositories[repository.id] = repository
self.repository_graph_feed(repository.id)
return repository.id
@property @property
def repositories(self) -> list[str]: def repositories(self) -> list[str]:
return list(self._repositories) return list(self._repositories)
def repository(self, name: str) -> Repository: def repository(self, id: str) -> Repository:
return self._repositories[name] return self._repositories[id]
def add_flux(self, name: str, flux: Flux) -> str: def add_flux(self, name: str, flux: Flux) -> str:
if name in self._fluxes: if name in self._fluxes:
@ -36,6 +52,11 @@ class DataPlateform:
self._fluxes[name] = flux self._fluxes[name] = flux
return name return name
def flux_graphset_feed(self, flux_name: str):
for schema in self._repositories[repository_name].schemas():
for table in self._repositories[repository_name].tables(schema):
self._graph.add_node(Node(name=table))
@property @property
def fluxes(self) -> list[str]: def fluxes(self) -> list[str]:
return list(self._fluxes) return list(self._fluxes)

View File

@ -5,7 +5,6 @@ from pydantic import BaseModel
class Node(BaseModel): class Node(BaseModel):
name: str name: str
infos: dict = {}
def __hash__(self): def __hash__(self):
return hash(self.name) return hash(self.name)

View File

@ -86,11 +86,10 @@ class FSRepository(Repository):
"table": "{repo_name}-{schema_name}-{table_name}", "table": "{repo_name}-{schema_name}-{table_name}",
} }
def __init__(self, name: str, basepath: str, id: str): def __init__(self, id: str, name: str, basepath: str):
self._basepath = Path(basepath) super().__init__(id, name)
self.name = name
self.id = id
self._basepath = Path(basepath)
assert self._basepath.exists() assert self._basepath.exists()
def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]: def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]:

View File

@ -4,8 +4,17 @@ from plesna.models.storage import Partition, Schema, Table
class Repository: class Repository:
def __init__(self): def __init__(self, id: str, name: str):
pass self._id = id
self._name = name
@property
def id(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@abc.abstractmethod @abc.abstractmethod
def schemas(self) -> list[str]: def schemas(self) -> list[str]:

View File

@ -4,6 +4,7 @@ from pathlib import Path
import pytest import pytest
from plesna.dataplatform import DataPlateform from plesna.dataplatform import DataPlateform
from plesna.graph.graph import Node
from plesna.models.flux import Flux from plesna.models.flux import Flux
from plesna.models.transformation import Transformation from plesna.models.transformation import Transformation
from plesna.storage.repository.fs_repository import FSRepository from plesna.storage.repository.fs_repository import FSRepository
@ -24,38 +25,20 @@ def repository(tmp_path) -> FSRepository:
silver_path = Path(tmp_path) / "silver" silver_path = Path(tmp_path) / "silver"
silver_path.mkdir() silver_path.mkdir()
return FSRepository("test", tmp_path, "test") return FSRepository("test", "test", tmp_path)
def test_add_repository( def test_add_repository(
repository: FSRepository, repository: FSRepository,
): ):
dp = DataPlateform() dp = DataPlateform()
dp.add_repository("test", repository) dp.add_repository(repository)
assert dp.repositories == ["test"] assert dp.repositories == ["test"]
assert dp.repository("test") == repository assert dp.repository("test") == repository
@pytest.fixture
def foo_flux(repository: FSRepository) -> Flux:
src = {"username": repository.table("test-raw-username")}
targets = {"username": repository.table("test-bronze-username")}
def foo(sources, targets):
return {"who": "foo"}
extra_kwrds = {}
flux = Flux(
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
)
return flux
@pytest.fixture @pytest.fixture
def copy_flux(repository: FSRepository) -> Flux: def copy_flux(repository: FSRepository) -> Flux:
raw_username = {"username": repository.table("test-raw-username")} raw_username = {"username": repository.table("test-raw-username")}
@ -77,9 +60,30 @@ def copy_flux(repository: FSRepository) -> Flux:
return raw_brz_copy_username return raw_brz_copy_username
@pytest.fixture
def foo_flux(repository: FSRepository) -> Flux:
src = {
"username": repository.table("test-raw-username"),
"recovery": repository.table("test-raw-recovery"),
}
targets = {"username_foo": repository.table("test-bronze-foo")}
def foo(sources, targets):
return {"who": "foo"}
extra_kwrds = {}
flux = Flux(
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
)
return flux
def test_add_flux(repository: FSRepository, copy_flux: Flux): def test_add_flux(repository: FSRepository, copy_flux: Flux):
dataplatform = DataPlateform() dataplatform = DataPlateform()
dataplatform.add_repository("test", repository) dataplatform.add_repository(repository)
dataplatform.add_flux(name="copy_flux", flux=copy_flux) dataplatform.add_flux(name="copy_flux", flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"] assert dataplatform.fluxes == ["copy_flux"]
@ -98,7 +102,7 @@ def dataplatform(
) -> DataPlateform: ) -> DataPlateform:
dp = DataPlateform() dp = DataPlateform()
dp.add_repository("test", repository) dp.add_repository(repository)
dp.add_flux("foo", foo_flux) dp.add_flux("foo", foo_flux)
dp.add_flux("raw_brz_copy_username", copy_flux) dp.add_flux("raw_brz_copy_username", copy_flux)
@ -121,8 +125,16 @@ def test_listing_content(dataplatform: DataPlateform):
def test_content_from_graph(dataplatform: DataPlateform): def test_content_from_graph(dataplatform: DataPlateform):
# assert dataplatform.graphset.model_dump() == {} assert dataplatform.graph.nodes == {
pass Node(name="test-raw-recovery", infos={}),
Node(name="test-raw-salary", infos={}),
Node(name="test-raw-username", infos={}),
}
# assert dataplatform.graphset.node_sets == {
# Node(name="test-raw-username", infos={}),
# Node(name="test-bronze-username", infos={}),
# }
def test_execute_flux(dataplatform: DataPlateform): def test_execute_flux(dataplatform: DataPlateform):

View File

@ -20,7 +20,7 @@ def location(tmp_path):
def test_init(location): def test_init(location):
repo = FSRepository("example", location, "example") repo = FSRepository("example", "example", location)
assert repo.ls() == [ assert repo.ls() == [
"schema", "schema",
] ]
@ -45,7 +45,7 @@ def test_init(location):
@pytest.fixture @pytest.fixture
def repository(location) -> FSRepository: def repository(location) -> FSRepository:
return FSRepository("example", location, "example") return FSRepository("example", "example", location)
def test_list_schemas(repository): def test_list_schemas(repository):