From 478a8c2403c9e299158609b737de4551165ecfea Mon Sep 17 00:00:00 2001
From: Bertrand Benjamin <>
Date: Sat, 18 Jan 2025 07:31:30 +0100
Subject: [PATCH] Feat: register table modifications

 .../                 | 84 ++++++++++++------
 .../                    |  3 +
 tests/storage/  | 87 ++++++++++++++++---
 3 files changed, 138 insertions(+), 36 deletions(-)

diff --git a/plesna/storage/metadata_repository/ b/plesna/storage/metadata_repository/
index a7f5b3e..441b3ad 100644
--- a/plesna/storage/metadata_repository/
+++ b/plesna/storage/metadata_repository/
@@ -22,9 +22,9 @@ class FSMetaDataRepository(MetaDataRepository):
-    FILEMODEL = {
-        "execution": "{flux_id}_execution.csv",
-        "modification": "{table_id}_modification.csv",
+    OBJECTS = {
+        "flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog},
+        "table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog},
     def __init__(self, basepath: str):
@@ -33,47 +33,79 @@ class FSMetaDataRepository(MetaDataRepository):
         self._basepath = Path(basepath)
         assert self._basepath.exists()
-    def fluxes(self) -> list[str]:
-        """List fluxes's ids"""
-        fluxes = []
+    def get_things(self, what: str) -> list[str]:
+        """List all ids for 'what'"""
+        whats = []
         for filepath in self._basepath.iterdir():
-                founded = extract_values_from_pattern(self.FILEMODEL["execution"],
+                founded = extract_values_from_pattern(
+                    self.OBJECTS[what]["filemodel"],
+                )
             except StringToolsError:
-                fluxes.append(founded["flux_id"])
-        return fluxes
+                whats.append(founded["id"])
+        return whats
+    def fluxes(self) -> list[str]:
+        """List fluxes's ids"""
+        return self.get_things(what="flux")
+    def tables(
+        self,
+    ) -> list[str]:
+        """List all table's ids"""
+        return self.get_things(what="table")
+    def _add_thing(self, what: str, id: str) -> str:
+        """Add the new things 'what'"""
+        filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
+        filepath.touch()
+        with open(filepath, "a") as csvfile:
+            writer = csv.DictWriter(
+                csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
+            )
+            writer.writeheader()
+        return id
     def add_flux(self, flux_id: str) -> str:
         """Get the flux metadata"""
-        filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
-        filepath.touch()
-        with open(filepath, "a") as csvfile:
-            writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
-            writer.writeheader()
-        return flux_id
+        return self._add_thing(what="flux", id=flux_id)
-    def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
-        """Get the flux metadata"""
-        filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
-        metadata_ = ExecutionLog(datetime=dt, output={"data": output})
+    def add_table(self, table_id: str) -> str:
+        """Get the table metadata"""
+        return self._add_thing(what="table", id=table_id)
+    def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog:
+        filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
         if not filepath.exists:
-            raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
+            raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
+        metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event)
         with open(filepath, "a") as csvfile:
-            writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
+            writer = csv.DictWriter(
+                csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
+            )
         return metadata_
+    def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
+        """Get the flux metadata"""
+        return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}})
+    def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str:
+        """Get the table metadata"""
+        return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
     def flux(self, flux_id: str) -> ExecutionLog:
         """Get the last flux log"""
         return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
     def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
         """Get all flux logs"""
-        filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
+        filepath = self._basepath / self.OBJECTS["flux"]["filemodel"].format(id=flux_id)
         if not filepath.exists:
             raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
         with open(filepath, "r") as csvfile:
@@ -84,10 +116,10 @@ class FSMetaDataRepository(MetaDataRepository):
         return logs
-    def tables(self) -> list[str]:
-        """List table's name in schema (the id)"""
+    def table(self, table_id: str) -> ModificationLog:
+        """Get the last table's modification metadatas"""
         raise NotImplementedError
-    def table(self, table_id: str) -> ModificationLog:
-        """Get table's metadatas"""
+    def table_logs(self, table_id: str) -> list[ModificationLog]:
+        """Get all table's modification metadatas"""
         raise NotImplementedError
diff --git a/plesna/storage/metadata_repository/ b/plesna/storage/metadata_repository/
index f7a7def..a7b6fe0 100644
--- a/plesna/storage/metadata_repository/
+++ b/plesna/storage/metadata_repository/
@@ -10,6 +10,9 @@ class ModificationLog(BaseModel):
     datetime: datetime
     flux_id: str
+    def to_flat_dict(self):
+        return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id}
 class ExecutionLog(BaseModel):
     datetime: datetime
diff --git a/tests/storage/ b/tests/storage/
index f96f7f3..f752102 100644
--- a/tests/storage/
+++ b/tests/storage/
@@ -6,7 +6,7 @@ import pytest
 from plesna.models.flux import FluxMetaData
 from import FSMetaDataRepository
-from import ExecutionLog
+from import ExecutionLog, ModificationLog
@@ -30,8 +30,8 @@ def test_add_flux(location, metadata_repository):
     flux_id = "my_flux"
-    metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format(
-        flux_id=flux_id
+    metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
+        id=flux_id
     assert metadata_filepath.exists()
@@ -40,6 +40,13 @@ def test_add_flux(location, metadata_repository):
         assert content == "datetime,output\n"
+def test_add_and_list_fluxes(metadata_repository):
+    flux_ids = ["my_flux", "flux2", "blahblah"]
+    for f in flux_ids:
+        metadata_repository.add_flux(f)
+    assert metadata_repository.fluxes() == flux_ids
 def test_register_flux_execution(location, metadata_repository):
     flux_id = "my_flux"
@@ -52,8 +59,8 @@ def test_register_flux_execution(location, metadata_repository):
-    metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format(
-        flux_id=flux_id
+    metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
+        id=flux_id
     with open(metadata_filepath, "r") as csvfile:
         content =
@@ -116,8 +123,68 @@ def test_register_and_get_last_log(metadata_repository):
-def test_register_and_list_fluxes(metadata_repository):
-    flux_ids = ["my_flux", "flux2", "blahblah"]
-    for f in flux_ids:
-        metadata_repository.add_flux(f)
-    assert metadata_repository.fluxes() == flux_ids
+def test_add_and_list_tables(metadata_repository):
+    table_ids = ["my_table", "table2", "blahblah"]
+    for f in table_ids:
+        metadata_repository.add_table(f)
+    assert metadata_repository.tables() == table_ids
+def test_register_table_modification(location, metadata_repository):
+    table_id = "my_table"
+    flux_id = "my_flux"
+    metadata_repository.add_table(table_id)
+    metadata_repository.register_table_modification(
+        table_id, datetime(2023, 3, 15, 14, 30), flux_id
+    )
+    metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
+        id=table_id
+    )
+    with open(metadata_filepath, "r") as csvfile:
+        content =
+        assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
+# def test_register_and_get_logs(metadata_repository):
+#     table_id = "my_table"
+#     flux_id = "my_flux"
+#     metadata_repository.add_table(table_id)
+#     metadata_repository.register_table_modification(
+#         table_id, datetime(2023, 3, 15, 14, 30), flux_id
+#     )
+#     metadata_repository.register_table_modification(
+#         table_id, datetime(2024, 3, 15, 14, 30), flux_id
+#     )
+#     logs = metadata_repository.table_logs(table_id)
+#     assert logs == [
+#         ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_is=flux_id),
+#         ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_is=flux_id),
+#     ]
+# def test_register_and_get_last_log(metadata_repository):
+#     table_id = "my_table"
+#     metadata_repository.add_table(table_id)
+#     metadata_repository.register_table_modification(
+#         table_id,
+#         datetime(2023, 3, 15, 14, 30),
+#         output={"truc": "machin"},
+#     )
+#     metadata_repository.register_table_modification(
+#         table_id,
+#         datetime(2024, 3, 15, 14, 30),
+#         output={
+#             "truc": "chose",
+#         },
+#     )
+#     logs = metadata_repository.table(table_id)
+#     assert logs == modificationLog(
+#         datetime=datetime(2024, 3, 15, 14, 30),
+#         output=TableMetaData(data={"truc": "chose"}),
+#     )