From 543b3fe98ebe5804da14797c230d8c1c16d6399d Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Wed, 15 Jan 2025 06:56:46 +0100 Subject: [PATCH] Feat: start working on metadata_repository --- .../storage/metadata_repository/__init__.py | 0 .../fs_metadata_repository.py | 79 +++++++++++++++++++ .../metadata_repository.py | 78 ++++++++++++++++++ tests/storage/test_fs_metadata_repository.py | 54 +++++++++++++ 4 files changed, 211 insertions(+) create mode 100644 plesna/storage/metadata_repository/__init__.py create mode 100644 plesna/storage/metadata_repository/fs_metadata_repository.py create mode 100644 plesna/storage/metadata_repository/metadata_repository.py create mode 100644 tests/storage/test_fs_metadata_repository.py diff --git a/plesna/storage/metadata_repository/__init__.py b/plesna/storage/metadata_repository/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plesna/storage/metadata_repository/fs_metadata_repository.py b/plesna/storage/metadata_repository/fs_metadata_repository.py new file mode 100644 index 0000000..3714a49 --- /dev/null +++ b/plesna/storage/metadata_repository/fs_metadata_repository.py @@ -0,0 +1,79 @@ +from pathlib import Path +import csv + +from plesna.storage.metadata_repository.metadata_repository import ( + ExecutionMetaData, + MetaDataRepository, + ModificationMetaData, +) + + +class FSMetaDataRepositoryError(ValueError): + pass + + +class FSMetaDataRepository(MetaDataRepository): + """MetaData Repository based on csv files + + Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL + + """ + + EXECUTION_FILEMODEL = "{flux_id}_execution.csv" + MODIFICATION_FILEMODEL = "{table_id}_modification.csv" + + def __init__(self, basepath: str): + super().__init__() + + self._basepath = Path(basepath) + assert self._basepath.exists() + + def fluxes(self) -> list[str]: + """List fluxes's ids""" + raise NotImplementedError + + def add_flux(self, flux_id: str) -> str: + """Get the flux metadata""" + filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) + filepath.touch() + with open(filepath, "a") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys()) + writer.writeheader() + return flux_id + + def register_flux_execution(self, flux_id: str, metadata: dict) -> ExecutionMetaData: + """Get the flux metadata""" + filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) + metadata_ = ExecutionMetaData(**metadata) + if not filepath.exists: + raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") + + with open(filepath, "a") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys()) + writer.writerow(metadata_.to_flat_dict()) + + return metadata_ + + def flux(self, flux_id: str) -> ExecutionMetaData: + """Get the flux metadata""" + raise NotImplementedError + + def flux_logs(self, flux_id: str) -> list[ExecutionMetaData]: + """Get the flux metadata""" + filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) + if not filepath.exists: + raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") + with open(filepath, "r") as csvfile: + logs = [] + reader = csv.DictReader(csvfile, fieldnames=ExecutionMetaData.__fields__.keys()) + for row in reader: + logs.append(ExecutionMetaData(**row)) + return logs + + def tables(self) -> list[str]: + """List table's name in schema (the id)""" + raise NotImplementedError + + def table(self, table_id: str) -> ModificationMetaData: + """Get table's metadatas""" + raise NotImplementedError diff --git a/plesna/storage/metadata_repository/metadata_repository.py b/plesna/storage/metadata_repository/metadata_repository.py new file mode 100644 index 0000000..7ea66e5 --- /dev/null +++ b/plesna/storage/metadata_repository/metadata_repository.py @@ -0,0 +1,78 @@ +import abc +from datetime import datetime + +from pydantic import BaseModel + +from plesna.models.flux import FluxMetaData + + +class ModificationMetaData(BaseModel): + datetime: datetime + flux_id: str + + +class ExecutionMetaData(BaseModel): + datetime: datetime + output: FluxMetaData + + def to_flat_dict(self): + return {"datetime": self.datetime.timestamp(), "output": self.output.model_dump_json()} + + +class MetaDataRepository: + """Object that stores metadata about flux, schema, tables""" + + def __init__(self): + pass + + @abc.abstractmethod + def fluxes(self) -> list[str]: + """List fluxes's ids""" + raise NotImplementedError + + @abc.abstractmethod + def add_flux(self, flux_id: str) -> str: + """Get the flux metadata""" + raise NotImplementedError + + @abc.abstractmethod + def register_flux_execution(self, flux_id: str, metadata: ExecutionMetaData) -> str: + """Get the flux metadata""" + raise NotImplementedError + + @abc.abstractmethod + def flux(self, schema_id: str) -> ExecutionMetaData: + """Get the flux last execution metadata""" + raise NotImplementedError + + @abc.abstractmethod + def flux_logs(self, schema_id: str) -> list[ExecutionMetaData]: + """Get all the flux execution metadata""" + raise NotImplementedError + + @abc.abstractmethod + def tables( + self, + ) -> list[str]: + """List all table's ids""" + raise NotImplementedError + + @abc.abstractmethod + def add_table(self, table_id: str) -> str: + """Get the table metadata""" + raise NotImplementedError + + @abc.abstractmethod + def register_table_modification(self, table_id: str, metadata: ModificationMetaData) -> str: + """Get the table metadata""" + raise NotImplementedError + + @abc.abstractmethod + def table(self, table_id: str) -> ModificationMetaData: + """Get the last table's modification metadatas""" + raise NotImplementedError + + @abc.abstractmethod + def table_logs(self, table_id: str) -> list[ModificationMetaData]: + """Get all table's modification metadatas""" + raise NotImplementedError diff --git a/tests/storage/test_fs_metadata_repository.py b/tests/storage/test_fs_metadata_repository.py new file mode 100644 index 0000000..788425e --- /dev/null +++ b/tests/storage/test_fs_metadata_repository.py @@ -0,0 +1,54 @@ +from datetime import datetime +import shutil +from pathlib import Path + +import pytest + +from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository + + +@pytest.fixture +def location(tmp_path): + catalogpath = tmp_path / "catalog" + catalogpath.mkdir() + + return catalogpath + + +def test_init(location): + repo = FSMetaDataRepository(location) + + +@pytest.fixture +def metadata_repository(location) -> FSMetaDataRepository: + return FSMetaDataRepository(location) + + +def test_add_flux(location, metadata_repository): + flux_id = "my_flux" + metadata_repository.add_flux(flux_id) + + metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id) + assert metadata_filepath.exists() + + with open(metadata_filepath, "r") as csvfile: + content = csvfile.read() + assert content == "datetime,output\n" + + +def test_register_flux_execution(location, metadata_repository): + flux_id = "my_flux" + metadata_repository.add_flux(flux_id) + + metadata_repository.register_flux_execution( + flux_id, + metadata={ + "datetime": datetime(2023, 3, 15, 14, 30), + "output": {"data": {"truc": "machin"}}, + }, + ) + + metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id) + with open(metadata_filepath, "r") as csvfile: + content = csvfile.read() + assert content == 'datetime,output\n1678887000.0,"{""data"":{""truc"":""machin""}}"\n'