From e28ab332a7c0b4d20232873f9aeb4f60995b4ad6 Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Fri, 3 Jan 2025 15:54:18 +0100 Subject: [PATCH] feat: move fs_datacatalogue to fs_repository --- plesna/datastore/datastore.py | 3 - plesna/datastore/fs_datacatalogue.py | 91 ----------- plesna/models/storage.py | 38 ++++- plesna/{datastore => storage}/__init__.py | 0 plesna/storage/datacatalogue.py | 24 +++ .../fake_datacatalogue.py | 0 .../storage/repository}/__init__.py | 0 plesna/storage/repository/fs_repository.py | 152 ++++++++++++++++++ .../repository/repository.py} | 18 ++- tests/datastore/test_fs_datacatalogue.py | 61 ------- tests/storage/__init__.py | 0 tests/storage/test_fs_repository.py | 77 +++++++++ 12 files changed, 298 insertions(+), 166 deletions(-) delete mode 100644 plesna/datastore/datastore.py delete mode 100644 plesna/datastore/fs_datacatalogue.py rename plesna/{datastore => storage}/__init__.py (100%) create mode 100644 plesna/storage/datacatalogue.py rename plesna/{datastore => storage}/fake_datacatalogue.py (100%) rename {tests/datastore => plesna/storage/repository}/__init__.py (100%) create mode 100644 plesna/storage/repository/fs_repository.py rename plesna/{datastore/datacatalogue.py => storage/repository/repository.py} (55%) delete mode 100644 tests/datastore/test_fs_datacatalogue.py create mode 100644 tests/storage/__init__.py create mode 100644 tests/storage/test_fs_repository.py diff --git a/plesna/datastore/datastore.py b/plesna/datastore/datastore.py deleted file mode 100644 index 818fecf..0000000 --- a/plesna/datastore/datastore.py +++ /dev/null @@ -1,3 +0,0 @@ -class DataStore: - def __init__(self, name): - self._name diff --git a/plesna/datastore/fs_datacatalogue.py b/plesna/datastore/fs_datacatalogue.py deleted file mode 100644 index 30a2c26..0000000 --- a/plesna/datastore/fs_datacatalogue.py +++ /dev/null @@ -1,91 +0,0 @@ -from pathlib import Path - -from pydantic import BaseModel, computed_field - -from plesna.models.storage import Schema, Table - -from .datacatalogue import DataCatalogue - - -class FSTable(BaseModel): - path: Path - - @computed_field - @property - def ref(self) -> Table: - return Table( - id=str(self.path), - value=str(self.path), - ) - - -class FSSchema(BaseModel): - path: Path - tables: list[str] - - @computed_field - @property - def ref(self) -> Schema: - return Schema( - id=str(self.path), - value=str(self.path), - ) - - - -class FSDataCatalogue(DataCatalogue): - """DataCatalogue based on files tree structure""" - - def __init__(self, name: str, basepath: str = "."): - self._basepath = Path(basepath) - self.name = name - - assert self._basepath.exists() - - def ls( - self, dir="", only_files=False, only_directories=False, recursive=False - ) -> list[str]: - dirpath = self._basepath / dir - - if only_files: - return [ - str(f.relative_to(dirpath)) - for f in dirpath.iterdir() - if not f.is_dir() and not str(f).startswith(".") - ] - - if only_directories: - if recursive: - return [ - str(f[0].relative_to(dirpath)) - for f in dirpath.walk() - if not str(f).startswith(".") - ] - - return [ - str(f.relative_to(dirpath)) - for f in dirpath.iterdir() - if f.is_dir() and not str(f).startswith(".") - ] - - return [ - str(f.relative_to(dirpath)) - for f in dirpath.iterdir() - if not str(f).startswith(".") - ] - - @property - def schemas(self) -> list[str]: - """List schemas (sub directories within basepath)""" - subdirectories = self.ls("", only_directories=True, recursive=True) - return [str(d) for d in subdirectories] - - def schema(self, schema: str) -> FSSchema: - """List schemas (sub directories within basepath)""" - tables = self.ls(schema, only_files=True) - return FSSchema(path=Path(schema), tables=tables) - - def table(self, schema: str, table:str) -> FSTable: - """List table in schema (which are files in the directory)""" - schema_path = schema_id - return {path: FSTable(path=path) for path in self.ls(schema_path, only_files=True)} diff --git a/plesna/models/storage.py b/plesna/models/storage.py index 73d44cc..788a0e5 100644 --- a/plesna/models/storage.py +++ b/plesna/models/storage.py @@ -2,24 +2,54 @@ from pydantic import BaseModel class Schema(BaseModel): - """Logical agregation for Table + """Where multiple tables are stored id: uniq identifier for the schema - value: string which describe where to find the schema in the storage system - + repo_id: id of the repo where the schema belong to + name: name of the schema + value: string which describe where to find the schema in the repository """ id: str + repo_id: str + name: str value: str + tables: list[str] = [] class Table(BaseModel): + """Place where same structured data are stored + + id: uniq identifier for the table + repo_id: id of the repo where the table belong to + schema_id: id of the schema where table belong to + name: the name of the table + value: string which describe where to find the table in the storage system + """ + + id: str + repo_id: str + schema_id: str + name: str + value: str + partitions: list[str] = [] + + +class Partition(BaseModel): """Place where data are stored id: uniq identifier for the table - value: string which describe where to find the table in the storage system + repo_id: id of the repo where the table belong to + schema_id: id of the schema where table belong to + table_id: id of the schema where table belong to + name: the name of the partition + value: string which describe where to find the partition in the storage system """ id: str + repo_id: str + schema_id: str + table_id: str + name: str value: str diff --git a/plesna/datastore/__init__.py b/plesna/storage/__init__.py similarity index 100% rename from plesna/datastore/__init__.py rename to plesna/storage/__init__.py diff --git a/plesna/storage/datacatalogue.py b/plesna/storage/datacatalogue.py new file mode 100644 index 0000000..1ae51de --- /dev/null +++ b/plesna/storage/datacatalogue.py @@ -0,0 +1,24 @@ +import abc + +from plesna.models.storage import Schema + + +class DataCatalogue: + def __init__(self): + pass + + @property + @abc.abstractmethod + def schemas(self) -> list[str]: + """List schema's names""" + raise NotImplementedError + + @abc.abstractmethod + def schema(self, name: str) -> Schema: + """Get the schema properties""" + raise NotImplementedError + + @abc.abstractmethod + def tables(self, schema: str) -> list[str]: + """List table's name in schema""" + raise NotImplementedError diff --git a/plesna/datastore/fake_datacatalogue.py b/plesna/storage/fake_datacatalogue.py similarity index 100% rename from plesna/datastore/fake_datacatalogue.py rename to plesna/storage/fake_datacatalogue.py diff --git a/tests/datastore/__init__.py b/plesna/storage/repository/__init__.py similarity index 100% rename from tests/datastore/__init__.py rename to plesna/storage/repository/__init__.py diff --git a/plesna/storage/repository/fs_repository.py b/plesna/storage/repository/fs_repository.py new file mode 100644 index 0000000..151db2b --- /dev/null +++ b/plesna/storage/repository/fs_repository.py @@ -0,0 +1,152 @@ +from pathlib import Path + +from pydantic import BaseModel, computed_field + +from plesna.models.storage import Partition, Schema, Table +from plesna.storage.repository.repository import Repository + + +class FSPartition(BaseModel): + name: str + path: Path + + @computed_field + @property + def ref(self) -> Partition: + return Partition( + id=str(self.path), + repo_id=str(self.path.parent.parent.parent), + schema_id=str(self.path.parent.parent), + table_id=str(self.path.parent), + name=self.name, + value=str(self.path.absolute()), + ) + + +class FSTable(BaseModel): + name: str + path: Path + is_partitionned: bool + partitions: list[str] = [] + + @computed_field + @property + def ref(self) -> Table: + return Table( + id=str(self.path), + repo_id=str(self.path.parent.parent), + schema_id=str(self.path.parent), + name=self.name, + value=str(self.path.absolute()), + partitions=self.partitions, + ) + + +class FSSchema(BaseModel): + name: str + path: Path + tables: list[str] + + @computed_field + @property + def ref(self) -> Schema: + return Schema( + id=str(self.path), + repo_id=str(self.path.parent), + name=self.name, + value=str(self.path.absolute()), + tables=self.tables, + ) + + +class FSRepository(Repository): + """Repository based on files tree structure + + - first level: schemas + - second level: tables + - third level: partition (actual datas) + + """ + + def __init__(self, name: str, basepath: str, id: str): + self._basepath = Path(basepath) + self.name = name + self.id = id + + assert self._basepath.exists() + + def ls( + self, dir="", only_files=False, only_directories=False, recursive=False + ) -> list[str]: + """List files in dir + + :param dir: relative path from self._basepath + :param only_files: if true return only files + :param only_directories: if true return only directories + :param recursive: list content recursively (only for) + :return: list of string describing path from self._basepath / dir + """ + dirpath = self._basepath / dir + + if recursive: + paths = dirpath.rglob("*") + else: + paths = dirpath.iterdir() + + if only_files: + return [ + str(f.relative_to(dirpath)) + for f in paths + if not f.is_dir() and not str(f).startswith(".") + ] + if only_directories: + return [ + str(f.relative_to(dirpath)) + for f in paths + if f.is_dir() and not str(f).startswith(".") + ] + + return [ + str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".") + ] + + def schemas(self) -> list[str]: + """List schemas (sub directories within basepath)""" + subdirectories = self.ls("", only_directories=True, recursive=True) + return [str(d) for d in subdirectories] + + def _schema(self, name: str) -> FSSchema: + """List schemas (sub directories within basepath)""" + schema_path = self._basepath / name + tables = self.ls(name) + return FSSchema(name=name, path=schema_path, tables=tables) + + def schema(self, name: str) -> Schema: + return self._schema(name).ref + + def _table(self, schema: str, name: str) -> FSTable: + """Get infos on the table""" + table_path = self._basepath / schema / name + is_partitionned = table_path.is_dir() + if is_partitionned: + partitions = self.ls(f"{schema}/{name}", only_files=True) + else: + partitions = [] + + return FSTable( + name=name, + path=table_path, + is_partitionned=is_partitionned, + partitions=partitions, + ) + + def table(self, schema: str, name: str) -> Table: + return self._table(schema, name).ref + + def _partition(self, schema: str, table: str, partition: str) -> FSPartition: + """Get infos on the partition""" + table_path = self._basepath / schema / table + return FSPartition(name=partition, table_path=table_path) + + def partition(self, schema: str, name: str) -> Partition: + return self._partition(schema, name).ref diff --git a/plesna/datastore/datacatalogue.py b/plesna/storage/repository/repository.py similarity index 55% rename from plesna/datastore/datacatalogue.py rename to plesna/storage/repository/repository.py index eb9cf9d..537816e 100644 --- a/plesna/datastore/datacatalogue.py +++ b/plesna/storage/repository/repository.py @@ -1,13 +1,12 @@ import abc -from plesna.models.storage import Schema, Table +from plesna.models.storage import Partition, Schema, Table -class DataCatalogue: +class Repository: def __init__(self): pass - @property @abc.abstractmethod def schemas(self) -> list[str]: """List schema's names""" @@ -19,16 +18,21 @@ class DataCatalogue: raise NotImplementedError @abc.abstractmethod - def tables(self, schema:str) -> list[str]: + def tables(self, schema: str) -> list[str]: """List table's name in schema""" raise NotImplementedError @abc.abstractmethod - def table(self, schema:str, table:str) -> Table: + def table(self, schema: str, name: str) -> Table: """Get the table properties""" raise NotImplementedError @abc.abstractmethod - def infos(self, table: str, schema: str) -> dict[str, str]: - """Get infos about the table""" + def partitions(self, schema: str, table: str) -> list[str]: + """List partition's name in table""" + raise NotImplementedError + + @abc.abstractmethod + def partition(self, schema: str, name: str, partition: str) -> Partition: + """Get the partition properties""" raise NotImplementedError diff --git a/tests/datastore/test_fs_datacatalogue.py b/tests/datastore/test_fs_datacatalogue.py deleted file mode 100644 index c2e2844..0000000 --- a/tests/datastore/test_fs_datacatalogue.py +++ /dev/null @@ -1,61 +0,0 @@ -import shutil -from pathlib import Path - -import pytest - -from plesna.datastore.fs_datacatalogue import FSDataCatalogue -from plesna.models.storage import Schema - -FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/") - - -@pytest.fixture -def location(tmp_path): - loc = tmp_path - username_loc = loc / "username" - username_loc.mkdir() - salary_loc = loc / "salary" - salary_loc.mkdir() - example_src = FIXTURE_DIR - assert example_src.exists() - - for f in example_src.glob("*"): - if "username" in str(f): - shutil.copy(f, username_loc) - else: - shutil.copy(f, salary_loc) - - return loc - - -def test_init(location): - repo = FSDataCatalogue("example", location) - assert repo.ls() == [ - "username", - "salary", - ] - - assert repo.ls(recursive=True) == [ - "username", - "salary", - ] - - -def test_list_schema(location): - repo = FSDataCatalogue("example", location) - - assert repo.schemas == [".", "username", "salary"] - assert repo.schema(".").ref == Schema(id=".", value=".") - assert repo.schema("username").ref == Schema(id="username", value="username") - -def test_list_tables_schema(location): - repo = FSDataCatalogue("example", location) - - assert repo.schema(".").tables == [] - assert repo.schema("username").tables == [ - 'username.csv', - 'username-password-recovery-code.xlsx', - 'username-password-recovery-code.xls', - ] - assert repo.schema("salary").tables == ["salary.pdf"] - diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/storage/test_fs_repository.py b/tests/storage/test_fs_repository.py new file mode 100644 index 0000000..e91260f --- /dev/null +++ b/tests/storage/test_fs_repository.py @@ -0,0 +1,77 @@ +import shutil +from pathlib import Path + +import pytest + +from plesna.models.storage import Schema +from plesna.storage.repository.fs_repository import FSRepository + +FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/") + + +@pytest.fixture +def location(tmp_path): + loc = tmp_path + username_loc = loc / "username" + username_loc.mkdir() + salary_loc = loc / "salary" + salary_loc.mkdir() + example_src = FIXTURE_DIR + assert example_src.exists() + + for f in example_src.glob("*"): + if "username" in str(f): + shutil.copy(f, username_loc) + else: + shutil.copy(f, salary_loc) + + return loc + + +def test_init(location): + repo = FSRepository("example", location, "example") + assert repo.ls() == [ + "username", + "salary", + ] + + assert repo.ls(recursive=True) == [ + "username", + "salary", + "username/username.csv", + "username/username-password-recovery-code.xlsx", + "username/username-password-recovery-code.xls", + "salary/salary.pdf", + ] + + +@pytest.fixture +def repository(location) -> FSRepository: + return FSRepository("example", location, "example") + + +def test_list_schema(location, repository): + assert repository.schemas() == ["username", "salary"] + assert repository.schema("username").name == "username" + assert repository.schema("username").id == str(location / "username") + assert repository.schema("username").repo_id == str(location) + assert repository.schema("username").value == str(location / "username") + + +def test_list_tables_schema(repository): + assert repository.schema("username").tables == [ + "username.csv", + "username-password-recovery-code.xlsx", + "username-password-recovery-code.xls", + ] + assert repository.schema("salary").tables == ["salary.pdf"] + + +def test_describe_table(location, repository): + table = repository.table("username", "username.csv") + assert table.id == str(location / "username" / "username.csv") + assert table.repo_id == str(location) + assert table.schema_id == str(location / "username") + assert table.name == "username.csv" + assert table.value == str(location / "username" / "username.csv") + assert table.partitions == []