diff --git a/plesna/storage/metadata_repository/fs_metadata_repository.py b/plesna/storage/metadata_repository/fs_metadata_repository.py index a7f5b3e..441b3ad 100644 --- a/plesna/storage/metadata_repository/fs_metadata_repository.py +++ b/plesna/storage/metadata_repository/fs_metadata_repository.py @@ -22,9 +22,9 @@ class FSMetaDataRepository(MetaDataRepository): """ - FILEMODEL = { - "execution": "{flux_id}_execution.csv", - "modification": "{table_id}_modification.csv", + OBJECTS = { + "flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog}, + "table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog}, } def __init__(self, basepath: str): @@ -33,47 +33,79 @@ class FSMetaDataRepository(MetaDataRepository): self._basepath = Path(basepath) assert self._basepath.exists() - def fluxes(self) -> list[str]: - """List fluxes's ids""" - fluxes = [] + def get_things(self, what: str) -> list[str]: + """List all ids for 'what'""" + whats = [] for filepath in self._basepath.iterdir(): try: - founded = extract_values_from_pattern(self.FILEMODEL["execution"], filepath.name) + founded = extract_values_from_pattern( + self.OBJECTS[what]["filemodel"], filepath.name + ) except StringToolsError: pass else: - fluxes.append(founded["flux_id"]) - return fluxes + whats.append(founded["id"]) + return whats + + def fluxes(self) -> list[str]: + """List fluxes's ids""" + return self.get_things(what="flux") + + def tables( + self, + ) -> list[str]: + """List all table's ids""" + return self.get_things(what="table") + + def _add_thing(self, what: str, id: str) -> str: + """Add the new things 'what'""" + filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id) + filepath.touch() + with open(filepath, "a") as csvfile: + writer = csv.DictWriter( + csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys() + ) + writer.writeheader() + return id def add_flux(self, flux_id: str) -> str: """Get the flux metadata""" - filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id) - filepath.touch() - with open(filepath, "a") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys()) - writer.writeheader() - return flux_id + return self._add_thing(what="flux", id=flux_id) - def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog: - """Get the flux metadata""" - filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id) - metadata_ = ExecutionLog(datetime=dt, output={"data": output}) + def add_table(self, table_id: str) -> str: + """Get the table metadata""" + return self._add_thing(what="table", id=table_id) + + def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog: + filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id) if not filepath.exists: - raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") + raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.") + + metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event) with open(filepath, "a") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys()) + writer = csv.DictWriter( + csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys() + ) writer.writerow(metadata_.to_flat_dict()) return metadata_ + def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog: + """Get the flux metadata""" + return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}}) + + def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str: + """Get the table metadata""" + return self._register_things_event("table", table_id, dt, {"flux_id": flux_id}) + def flux(self, flux_id: str) -> ExecutionLog: """Get the last flux log""" return max(self.flux_logs(flux_id), key=lambda l: l.datetime) def flux_logs(self, flux_id: str) -> list[ExecutionLog]: """Get all flux logs""" - filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id) + filepath = self._basepath / self.OBJECTS["flux"]["filemodel"].format(id=flux_id) if not filepath.exists: raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") with open(filepath, "r") as csvfile: @@ -84,10 +116,10 @@ class FSMetaDataRepository(MetaDataRepository): logs.append(ExecutionLog(**row)) return logs - def tables(self) -> list[str]: - """List table's name in schema (the id)""" + def table(self, table_id: str) -> ModificationLog: + """Get the last table's modification metadatas""" raise NotImplementedError - def table(self, table_id: str) -> ModificationLog: - """Get table's metadatas""" + def table_logs(self, table_id: str) -> list[ModificationLog]: + """Get all table's modification metadatas""" raise NotImplementedError diff --git a/plesna/storage/metadata_repository/metadata_repository.py b/plesna/storage/metadata_repository/metadata_repository.py index f7a7def..a7b6fe0 100644 --- a/plesna/storage/metadata_repository/metadata_repository.py +++ b/plesna/storage/metadata_repository/metadata_repository.py @@ -10,6 +10,9 @@ class ModificationLog(BaseModel): datetime: datetime flux_id: str + def to_flat_dict(self): + return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id} + class ExecutionLog(BaseModel): datetime: datetime diff --git a/tests/storage/test_fs_metadata_repository.py b/tests/storage/test_fs_metadata_repository.py index f96f7f3..f752102 100644 --- a/tests/storage/test_fs_metadata_repository.py +++ b/tests/storage/test_fs_metadata_repository.py @@ -6,7 +6,7 @@ import pytest from plesna.models.flux import FluxMetaData from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository -from plesna.storage.metadata_repository.metadata_repository import ExecutionLog +from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog @pytest.fixture @@ -30,8 +30,8 @@ def test_add_flux(location, metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) - metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format( - flux_id=flux_id + metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format( + id=flux_id ) assert metadata_filepath.exists() @@ -40,6 +40,13 @@ def test_add_flux(location, metadata_repository): assert content == "datetime,output\n" +def test_add_and_list_fluxes(metadata_repository): + flux_ids = ["my_flux", "flux2", "blahblah"] + for f in flux_ids: + metadata_repository.add_flux(f) + assert metadata_repository.fluxes() == flux_ids + + def test_register_flux_execution(location, metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) @@ -52,8 +59,8 @@ def test_register_flux_execution(location, metadata_repository): }, ) - metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format( - flux_id=flux_id + metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format( + id=flux_id ) with open(metadata_filepath, "r") as csvfile: content = csvfile.read() @@ -116,8 +123,68 @@ def test_register_and_get_last_log(metadata_repository): ) -def test_register_and_list_fluxes(metadata_repository): - flux_ids = ["my_flux", "flux2", "blahblah"] - for f in flux_ids: - metadata_repository.add_flux(f) - assert metadata_repository.fluxes() == flux_ids +def test_add_and_list_tables(metadata_repository): + table_ids = ["my_table", "table2", "blahblah"] + for f in table_ids: + metadata_repository.add_table(f) + assert metadata_repository.tables() == table_ids + + +def test_register_table_modification(location, metadata_repository): + table_id = "my_table" + flux_id = "my_flux" + metadata_repository.add_table(table_id) + + metadata_repository.register_table_modification( + table_id, datetime(2023, 3, 15, 14, 30), flux_id + ) + + metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format( + id=table_id + ) + with open(metadata_filepath, "r") as csvfile: + content = csvfile.read() + assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n" + + +# def test_register_and_get_logs(metadata_repository): +# table_id = "my_table" +# flux_id = "my_flux" +# metadata_repository.add_table(table_id) +# +# metadata_repository.register_table_modification( +# table_id, datetime(2023, 3, 15, 14, 30), flux_id +# ) +# metadata_repository.register_table_modification( +# table_id, datetime(2024, 3, 15, 14, 30), flux_id +# ) +# +# logs = metadata_repository.table_logs(table_id) +# assert logs == [ +# ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_is=flux_id), +# ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_is=flux_id), +# ] + + +# def test_register_and_get_last_log(metadata_repository): +# table_id = "my_table" +# metadata_repository.add_table(table_id) +# +# metadata_repository.register_table_modification( +# table_id, +# datetime(2023, 3, 15, 14, 30), +# output={"truc": "machin"}, +# ) +# metadata_repository.register_table_modification( +# table_id, +# datetime(2024, 3, 15, 14, 30), +# output={ +# "truc": "chose", +# }, +# ) +# +# logs = metadata_repository.table(table_id) +# assert logs == modificationLog( +# datetime=datetime(2024, 3, 15, 14, 30), +# output=TableMetaData(data={"truc": "chose"}), +# )