From 86f0dcc49e6377811d95a3b491964c41a59728e7 Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Sat, 4 Jan 2025 21:33:05 +0100 Subject: [PATCH] Feat: execute flux on dataplatform --- plesna/models/storage.py | 5 ++++ plesna/storage/repository/fs_repository.py | 14 ++++++----- tests/compute/test_consume_flux.py | 8 +++--- tests/dataplatform/test_dataplateform.py | 29 +++++++++++++++------- 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/plesna/models/storage.py b/plesna/models/storage.py index 788a0e5..395ffa4 100644 --- a/plesna/models/storage.py +++ b/plesna/models/storage.py @@ -25,6 +25,10 @@ class Table(BaseModel): schema_id: id of the schema where table belong to name: the name of the table value: string which describe where to find the table in the storage system + + partitions: list of partitions + datas: list of string to access data + """ id: str @@ -33,6 +37,7 @@ class Table(BaseModel): name: str value: str partitions: list[str] = [] + datas: list[str] class Partition(BaseModel): diff --git a/plesna/storage/repository/fs_repository.py b/plesna/storage/repository/fs_repository.py index cb8056f..063ad8b 100644 --- a/plesna/storage/repository/fs_repository.py +++ b/plesna/storage/repository/fs_repository.py @@ -32,6 +32,11 @@ class FSTable(BaseModel): @computed_field @property def ref(self) -> Table: + if self.is_partitionned: + datas = [str(self.path.absolute() / p) for p in self.partitions] + else: + datas = [str(self.path.absolute())] + return Table( id=str(self.path), repo_id=str(self.path.parent.parent), @@ -39,6 +44,7 @@ class FSTable(BaseModel): name=self.name, value=str(self.path.absolute()), partitions=self.partitions, + datas=datas, ) @@ -75,9 +81,7 @@ class FSRepository(Repository): assert self._basepath.exists() - def ls( - self, dir="", only_files=False, only_directories=False, recursive=False - ) -> list[str]: + def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]: """List files in dir :param dir: relative path from self._basepath @@ -106,9 +110,7 @@ class FSRepository(Repository): if f.is_dir() and not str(f).startswith(".") ] - return [ - str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".") - ] + return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")] def schemas(self) -> list[str]: """List schemas (sub directories within basepath)""" diff --git a/tests/compute/test_consume_flux.py b/tests/compute/test_consume_flux.py index 9422bff..9582d81 100644 --- a/tests/compute/test_consume_flux.py +++ b/tests/compute/test_consume_flux.py @@ -7,18 +7,18 @@ from plesna.models.transformation import Transformation def test_consume_flux(): sources = { "src1": Table( - id="src1", repo_id="test", schema_id="test", name="test", value="here" + id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"] ), "src2": Table( - id="src2", repo_id="test", schema_id="test", name="test", value="here" + id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"] ), } targets = { "tgt1": Table( - id="tgt1", repo_id="test", schema_id="test", name="test", value="this" + id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"] ), "tgt2": Table( - id="tgt2", repo_id="test", schema_id="test", name="test", value="that" + id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"] ), } diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index 69e8ca5..d0ec044 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -29,7 +29,7 @@ def repository(tmp_path) -> FSRepository: for f in example_src.glob("*"): if "recovery" in str(f): shutil.copy(f, recovery_loc) - if "salary" in str(f): + elif "salary" in str(f): shutil.copy(f, salary_loc) else: shutil.copy(f, username_loc) @@ -73,20 +73,23 @@ def foo_flux(repository: FSRepository) -> Flux: @pytest.fixture def copy_flux(repository: FSRepository) -> Flux: - src = {"username": repository.table("raw", "username")} - targets = {"username": repository.table("bronze", "username")} + raw_username = {"username": repository.table("raw", "username")} + bronze_username = {"username": repository.table("bronze", "username")} def copy(sources, targets): - pass + src_path = Path(sources["username"].datas[0]) + tgt_path = Path(targets["username"].datas[0]) + shutil.copy(src_path, tgt_path) + return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size} extra_kwrds = {} - flux = Flux( - sources=src, - targets=targets, + raw_brz_copy_username = Flux( + sources=raw_username, + targets=bronze_username, transformation=Transformation(function=copy, extra_kwrds=extra_kwrds), ) - return flux + return raw_brz_copy_username def test_add_flux(repository: FSRepository, copy_flux: Flux): @@ -113,7 +116,7 @@ def dataplatform( dp.add_repository("test", repository) dp.add_flux("foo", foo_flux) - dp.add_flux("copy", copy_flux) + dp.add_flux("raw_brz_copy_username", copy_flux) return dp @@ -124,8 +127,16 @@ def test_listing_content(dataplatform: DataPlateform): "username", "salary", ] + assert dataplatform.repository("test").table("raw", "username").partitions == ["username.csv"] def test_execute_flux(dataplatform: DataPlateform): meta = dataplatform.execute_flux("foo") assert meta.data == {"who": "foo"} + + assert dataplatform.repository("test").schema("bronze").tables == [] + + meta = dataplatform.execute_flux("raw_brz_copy_username") + assert meta.data == {"src_size": 175, "tgt_size": 175} + + assert dataplatform.repository("test").schema("bronze").tables == ["username"]