diff --git a/plesna/storage/repository/fs_repository.py b/plesna/storage/repository/fs_repository.py index 8f19cc3..1ada1cd 100644 --- a/plesna/storage/repository/fs_repository.py +++ b/plesna/storage/repository/fs_repository.py @@ -149,14 +149,11 @@ class FSRepository(Repository): repo_name = parsed["repo_name"] schema_name = parsed["schema_name"] + schema_path = self._basepath / schema_name if repo_name != self.name: raise FSRepositoryError("Trying to get schema that don't belong in this repository") - schema_path = self._basepath / schema_name - if not schema_path.exists(): - raise FSRepositoryError(f"The schema {schema_name} does not exists") - tables = self.tables(schema_id) return FSSchema(name=schema_name, id=schema_id, path=schema_path, tables=tables) @@ -166,7 +163,7 @@ class FSRepository(Repository): def _tables(self, schema_id: str) -> list[str]: parsed = self.parse_id(schema_id, "schema") tables = self.ls(parsed["schema_name"]) - return tables + return [self.ID_FMT["table"].format(table_name=t, **parsed) for t in tables] def tables(self, schema_id: str = "") -> list[str]: if schema_id: @@ -183,10 +180,12 @@ class FSRepository(Repository): if parsed["repo_name"] != self.name: raise FSRepositoryError("Trying to get table that don't belong in this repository") + schema_path = self._basepath / parsed["schema_name"] + if not schema_path.exists(): + raise FSRepositoryError(f"The schema {parsed['schema_name']} does not exists.") + table_subpath = f"{parsed['schema_name']}/{parsed['table_name']}" table_path = self._basepath / table_subpath - if not table_path.exists(): - raise FSRepositoryError(f"The table {parsed['table_name']} does not exists") is_partitionned = table_path.is_dir() if is_partitionned: diff --git a/plesna/storage/repository/repository.py b/plesna/storage/repository/repository.py index 586378a..e679781 100644 --- a/plesna/storage/repository/repository.py +++ b/plesna/storage/repository/repository.py @@ -23,6 +23,6 @@ class Repository: raise NotImplementedError @abc.abstractmethod - def table(self, table_id) -> Table: + def table(self, table_id: str) -> Table: """Get the table properties (the id)""" raise NotImplementedError diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index 081c487..678f38b 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -40,8 +40,8 @@ def test_add_repository( @pytest.fixture def foo_flux(repository: FSRepository) -> Flux: - src = {"username": repository.table("raw", "username")} - targets = {"username": repository.table("bronze", "username")} + src = {"username": repository.table("test-raw-username")} + targets = {"username": repository.table("test-bronze-username")} def foo(sources, targets): return {"who": "foo"} @@ -58,8 +58,8 @@ def foo_flux(repository: FSRepository) -> Flux: @pytest.fixture def copy_flux(repository: FSRepository) -> Flux: - raw_username = {"username": repository.table("raw", "username")} - bronze_username = {"username": repository.table("bronze", "username")} + raw_username = {"username": repository.table("test-raw-username")} + bronze_username = {"username": repository.table("test-bronze-username")} def copy(sources, targets): src_path = Path(sources["username"].datas[0]) @@ -106,27 +106,32 @@ def dataplatform( def test_listing_content(dataplatform: DataPlateform): - assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"] - assert dataplatform.repository("test").schema("raw").tables == [ - "username", - "recovery", - "salary", + assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"] + assert dataplatform.repository("test").schema("test-raw").tables == [ + "test-raw-username", + "test-raw-recovery", + "test-raw-salary", ] - assert dataplatform.repository("test").table("raw", "username").partitions == ["username.csv"] - assert dataplatform.repository("test").table("raw", "recovery").partitions == [ + assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"] + assert dataplatform.repository("test").table("test-raw-recovery").partitions == [ "2022.csv", "2023.csv", "2024.csv", ] +def test_content_from_graph(dataplatform: DataPlateform): + # assert dataplatform.graphset.model_dump() == {} + pass + + def test_execute_flux(dataplatform: DataPlateform): meta = dataplatform.execute_flux("foo") assert meta.data == {"who": "foo"} - assert dataplatform.repository("test").schema("bronze").tables == [] + assert dataplatform.repository("test").schema("test-bronze").tables == [] meta = dataplatform.execute_flux("raw_brz_copy_username") assert meta.data == {"src_size": 283, "tgt_size": 283} - assert dataplatform.repository("test").schema("bronze").tables == ["username"] + assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"] diff --git a/tests/storage/test_fs_repository.py b/tests/storage/test_fs_repository.py index 5910d72..b6feabc 100644 --- a/tests/storage/test_fs_repository.py +++ b/tests/storage/test_fs_repository.py @@ -58,13 +58,29 @@ def test_describe_schema(location, repository): assert schema.id == "example-schema" assert schema.repo_id == str(location) assert schema.value == str(location / "schema") - assert schema.tables == ["username", "recovery", "salary"] + assert schema.tables == [ + "example-schema-username", + "example-schema-recovery", + "example-schema-salary", + ] def test_list_tables_schema(repository): - assert repository.schema("example-schema").tables == ["username", "recovery", "salary"] - assert repository.tables("example-schema") == ["username", "recovery", "salary"] - assert repository.tables() == ["username", "recovery", "salary"] + assert repository.schema("example-schema").tables == [ + "example-schema-username", + "example-schema-recovery", + "example-schema-salary", + ] + assert repository.tables("example-schema") == [ + "example-schema-username", + "example-schema-recovery", + "example-schema-salary", + ] + assert repository.tables() == [ + "example-schema-username", + "example-schema-recovery", + "example-schema-salary", + ] def test_describe_table(location, repository):