diff --git a/plesna/storage/metadata_repository/fs_metadata_repository.py b/plesna/storage/metadata_repository/fs_metadata_repository.py index a2a1de3..4d9f408 100644 --- a/plesna/storage/metadata_repository/fs_metadata_repository.py +++ b/plesna/storage/metadata_repository/fs_metadata_repository.py @@ -1,11 +1,12 @@ from pathlib import Path +from datetime import datetime import csv import json from plesna.storage.metadata_repository.metadata_repository import ( - ExecutionMetaData, + ExecutionLog, MetaDataRepository, - ModificationMetaData, + ModificationLog, ) @@ -38,28 +39,28 @@ class FSMetaDataRepository(MetaDataRepository): filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) filepath.touch() with open(filepath, "a") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.model_fields.keys()) + writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys()) writer.writeheader() return flux_id - def register_flux_execution(self, flux_id: str, metadata: dict) -> ExecutionMetaData: + def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog: """Get the flux metadata""" filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) - metadata_ = ExecutionMetaData(**metadata) + metadata_ = ExecutionLog(datetime=dt, output={"data": output}) if not filepath.exists: raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") with open(filepath, "a") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.model_fields.keys()) + writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys()) writer.writerow(metadata_.to_flat_dict()) return metadata_ - def flux(self, flux_id: str) -> ExecutionMetaData: + def flux(self, flux_id: str) -> ExecutionLog: """Get the flux metadata""" raise NotImplementedError - def flux_logs(self, flux_id: str) -> list[ExecutionMetaData]: + def flux_logs(self, flux_id: str) -> list[ExecutionLog]: """Get the flux metadata""" filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) if not filepath.exists: @@ -69,13 +70,13 @@ class FSMetaDataRepository(MetaDataRepository): reader = csv.DictReader(csvfile) for row in reader: row["output"] = json.loads(row["output"]) - logs.append(ExecutionMetaData(**row)) + logs.append(ExecutionLog(**row)) return logs def tables(self) -> list[str]: """List table's name in schema (the id)""" raise NotImplementedError - def table(self, table_id: str) -> ModificationMetaData: + def table(self, table_id: str) -> ModificationLog: """Get table's metadatas""" raise NotImplementedError diff --git a/plesna/storage/metadata_repository/metadata_repository.py b/plesna/storage/metadata_repository/metadata_repository.py index 151749d..f7a7def 100644 --- a/plesna/storage/metadata_repository/metadata_repository.py +++ b/plesna/storage/metadata_repository/metadata_repository.py @@ -6,12 +6,12 @@ from pydantic import BaseModel from plesna.models.flux import FluxMetaData -class ModificationMetaData(BaseModel): +class ModificationLog(BaseModel): datetime: datetime flux_id: str -class ExecutionMetaData(BaseModel): +class ExecutionLog(BaseModel): datetime: datetime output: FluxMetaData @@ -36,17 +36,17 @@ class MetaDataRepository: raise NotImplementedError @abc.abstractmethod - def register_flux_execution(self, flux_id: str, metadata: ExecutionMetaData) -> str: + def register_flux_execution(self, flux_id: str, dt: datetime, metadata: dict) -> str: """Get the flux metadata""" raise NotImplementedError @abc.abstractmethod - def flux(self, schema_id: str) -> ExecutionMetaData: + def flux(self, schema_id: str) -> ExecutionLog: """Get the flux last execution metadata""" raise NotImplementedError @abc.abstractmethod - def flux_logs(self, schema_id: str) -> list[ExecutionMetaData]: + def flux_logs(self, schema_id: str) -> list[ExecutionLog]: """Get all the flux execution metadata""" raise NotImplementedError @@ -63,16 +63,16 @@ class MetaDataRepository: raise NotImplementedError @abc.abstractmethod - def register_table_modification(self, table_id: str, metadata: ModificationMetaData) -> str: + def register_table_modification(self, table_id: str, dt: datetime, metadata: dict) -> str: """Get the table metadata""" raise NotImplementedError @abc.abstractmethod - def table(self, table_id: str) -> ModificationMetaData: + def table(self, table_id: str) -> ModificationLog: """Get the last table's modification metadatas""" raise NotImplementedError @abc.abstractmethod - def table_logs(self, table_id: str) -> list[ModificationMetaData]: + def table_logs(self, table_id: str) -> list[ModificationLog]: """Get all table's modification metadatas""" raise NotImplementedError diff --git a/tests/storage/test_fs_metadata_repository.py b/tests/storage/test_fs_metadata_repository.py index a2d18b2..8e16ab1 100644 --- a/tests/storage/test_fs_metadata_repository.py +++ b/tests/storage/test_fs_metadata_repository.py @@ -6,7 +6,7 @@ import pytest from plesna.models.flux import FluxMetaData from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository -from plesna.storage.metadata_repository.metadata_repository import ExecutionMetaData +from plesna.storage.metadata_repository.metadata_repository import ExecutionLog @pytest.fixture @@ -44,9 +44,9 @@ def test_register_flux_execution(location, metadata_repository): metadata_repository.register_flux_execution( flux_id, - metadata={ - "datetime": datetime(2023, 3, 15, 14, 30), - "output": {"data": {"truc": "machin"}}, + datetime(2023, 3, 15, 14, 30), + output={ + "truc": "machin", }, ) @@ -64,26 +64,24 @@ def test_register_and_get_flux_execution(location, metadata_repository): metadata_repository.register_flux_execution( flux_id, - metadata={ - "datetime": datetime(2023, 3, 15, 14, 30), - "output": {"data": {"truc": "machin"}}, - }, + datetime(2023, 3, 15, 14, 30), + output={"truc": "machin"}, ) metadata_repository.register_flux_execution( flux_id, - metadata={ - "datetime": datetime(2024, 3, 15, 14, 30), - "output": {"data": {"truc": "chose"}}, + datetime(2024, 3, 15, 14, 30), + output={ + "truc": "chose", }, ) logs = metadata_repository.flux_logs(flux_id) assert logs == [ - ExecutionMetaData( + ExecutionLog( datetime=datetime(2023, 3, 15, 14, 30), output=FluxMetaData(data={"truc": "machin"}), ), - ExecutionMetaData( + ExecutionLog( datetime=datetime(2024, 3, 15, 14, 30), output=FluxMetaData(data={"truc": "chose"}), ),