Feat: register table modifications
This commit is contained in:
parent
8882317a47
commit
478a8c2403
@ -22,9 +22,9 @@ class FSMetaDataRepository(MetaDataRepository):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
FILEMODEL = {
|
OBJECTS = {
|
||||||
"execution": "{flux_id}_execution.csv",
|
"flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog},
|
||||||
"modification": "{table_id}_modification.csv",
|
"table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog},
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, basepath: str):
|
def __init__(self, basepath: str):
|
||||||
@ -33,47 +33,79 @@ class FSMetaDataRepository(MetaDataRepository):
|
|||||||
self._basepath = Path(basepath)
|
self._basepath = Path(basepath)
|
||||||
assert self._basepath.exists()
|
assert self._basepath.exists()
|
||||||
|
|
||||||
def fluxes(self) -> list[str]:
|
def get_things(self, what: str) -> list[str]:
|
||||||
"""List fluxes's ids"""
|
"""List all ids for 'what'"""
|
||||||
fluxes = []
|
whats = []
|
||||||
for filepath in self._basepath.iterdir():
|
for filepath in self._basepath.iterdir():
|
||||||
try:
|
try:
|
||||||
founded = extract_values_from_pattern(self.FILEMODEL["execution"], filepath.name)
|
founded = extract_values_from_pattern(
|
||||||
|
self.OBJECTS[what]["filemodel"], filepath.name
|
||||||
|
)
|
||||||
except StringToolsError:
|
except StringToolsError:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
fluxes.append(founded["flux_id"])
|
whats.append(founded["id"])
|
||||||
return fluxes
|
return whats
|
||||||
|
|
||||||
|
def fluxes(self) -> list[str]:
|
||||||
|
"""List fluxes's ids"""
|
||||||
|
return self.get_things(what="flux")
|
||||||
|
|
||||||
|
def tables(
|
||||||
|
self,
|
||||||
|
) -> list[str]:
|
||||||
|
"""List all table's ids"""
|
||||||
|
return self.get_things(what="table")
|
||||||
|
|
||||||
|
def _add_thing(self, what: str, id: str) -> str:
|
||||||
|
"""Add the new things 'what'"""
|
||||||
|
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
|
||||||
|
filepath.touch()
|
||||||
|
with open(filepath, "a") as csvfile:
|
||||||
|
writer = csv.DictWriter(
|
||||||
|
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
|
||||||
|
)
|
||||||
|
writer.writeheader()
|
||||||
|
return id
|
||||||
|
|
||||||
def add_flux(self, flux_id: str) -> str:
|
def add_flux(self, flux_id: str) -> str:
|
||||||
"""Get the flux metadata"""
|
"""Get the flux metadata"""
|
||||||
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
|
return self._add_thing(what="flux", id=flux_id)
|
||||||
filepath.touch()
|
|
||||||
with open(filepath, "a") as csvfile:
|
|
||||||
writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
|
|
||||||
writer.writeheader()
|
|
||||||
return flux_id
|
|
||||||
|
|
||||||
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
|
def add_table(self, table_id: str) -> str:
|
||||||
"""Get the flux metadata"""
|
"""Get the table metadata"""
|
||||||
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
|
return self._add_thing(what="table", id=table_id)
|
||||||
metadata_ = ExecutionLog(datetime=dt, output={"data": output})
|
|
||||||
|
def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog:
|
||||||
|
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
|
||||||
if not filepath.exists:
|
if not filepath.exists:
|
||||||
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
|
||||||
|
|
||||||
|
metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event)
|
||||||
|
|
||||||
with open(filepath, "a") as csvfile:
|
with open(filepath, "a") as csvfile:
|
||||||
writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
|
writer = csv.DictWriter(
|
||||||
|
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
|
||||||
|
)
|
||||||
writer.writerow(metadata_.to_flat_dict())
|
writer.writerow(metadata_.to_flat_dict())
|
||||||
|
|
||||||
return metadata_
|
return metadata_
|
||||||
|
|
||||||
|
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}})
|
||||||
|
|
||||||
|
def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str:
|
||||||
|
"""Get the table metadata"""
|
||||||
|
return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
|
||||||
|
|
||||||
def flux(self, flux_id: str) -> ExecutionLog:
|
def flux(self, flux_id: str) -> ExecutionLog:
|
||||||
"""Get the last flux log"""
|
"""Get the last flux log"""
|
||||||
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
|
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
|
||||||
|
|
||||||
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
|
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
|
||||||
"""Get all flux logs"""
|
"""Get all flux logs"""
|
||||||
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
|
filepath = self._basepath / self.OBJECTS["flux"]["filemodel"].format(id=flux_id)
|
||||||
if not filepath.exists:
|
if not filepath.exists:
|
||||||
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
||||||
with open(filepath, "r") as csvfile:
|
with open(filepath, "r") as csvfile:
|
||||||
@ -84,10 +116,10 @@ class FSMetaDataRepository(MetaDataRepository):
|
|||||||
logs.append(ExecutionLog(**row))
|
logs.append(ExecutionLog(**row))
|
||||||
return logs
|
return logs
|
||||||
|
|
||||||
def tables(self) -> list[str]:
|
def table(self, table_id: str) -> ModificationLog:
|
||||||
"""List table's name in schema (the id)"""
|
"""Get the last table's modification metadatas"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def table(self, table_id: str) -> ModificationLog:
|
def table_logs(self, table_id: str) -> list[ModificationLog]:
|
||||||
"""Get table's metadatas"""
|
"""Get all table's modification metadatas"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
@ -10,6 +10,9 @@ class ModificationLog(BaseModel):
|
|||||||
datetime: datetime
|
datetime: datetime
|
||||||
flux_id: str
|
flux_id: str
|
||||||
|
|
||||||
|
def to_flat_dict(self):
|
||||||
|
return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id}
|
||||||
|
|
||||||
|
|
||||||
class ExecutionLog(BaseModel):
|
class ExecutionLog(BaseModel):
|
||||||
datetime: datetime
|
datetime: datetime
|
||||||
|
@ -6,7 +6,7 @@ import pytest
|
|||||||
|
|
||||||
from plesna.models.flux import FluxMetaData
|
from plesna.models.flux import FluxMetaData
|
||||||
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
|
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
|
||||||
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog
|
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@ -30,8 +30,8 @@ def test_add_flux(location, metadata_repository):
|
|||||||
flux_id = "my_flux"
|
flux_id = "my_flux"
|
||||||
metadata_repository.add_flux(flux_id)
|
metadata_repository.add_flux(flux_id)
|
||||||
|
|
||||||
metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format(
|
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
|
||||||
flux_id=flux_id
|
id=flux_id
|
||||||
)
|
)
|
||||||
assert metadata_filepath.exists()
|
assert metadata_filepath.exists()
|
||||||
|
|
||||||
@ -40,6 +40,13 @@ def test_add_flux(location, metadata_repository):
|
|||||||
assert content == "datetime,output\n"
|
assert content == "datetime,output\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_and_list_fluxes(metadata_repository):
|
||||||
|
flux_ids = ["my_flux", "flux2", "blahblah"]
|
||||||
|
for f in flux_ids:
|
||||||
|
metadata_repository.add_flux(f)
|
||||||
|
assert metadata_repository.fluxes() == flux_ids
|
||||||
|
|
||||||
|
|
||||||
def test_register_flux_execution(location, metadata_repository):
|
def test_register_flux_execution(location, metadata_repository):
|
||||||
flux_id = "my_flux"
|
flux_id = "my_flux"
|
||||||
metadata_repository.add_flux(flux_id)
|
metadata_repository.add_flux(flux_id)
|
||||||
@ -52,8 +59,8 @@ def test_register_flux_execution(location, metadata_repository):
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format(
|
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
|
||||||
flux_id=flux_id
|
id=flux_id
|
||||||
)
|
)
|
||||||
with open(metadata_filepath, "r") as csvfile:
|
with open(metadata_filepath, "r") as csvfile:
|
||||||
content = csvfile.read()
|
content = csvfile.read()
|
||||||
@ -116,8 +123,68 @@ def test_register_and_get_last_log(metadata_repository):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_register_and_list_fluxes(metadata_repository):
|
def test_add_and_list_tables(metadata_repository):
|
||||||
flux_ids = ["my_flux", "flux2", "blahblah"]
|
table_ids = ["my_table", "table2", "blahblah"]
|
||||||
for f in flux_ids:
|
for f in table_ids:
|
||||||
metadata_repository.add_flux(f)
|
metadata_repository.add_table(f)
|
||||||
assert metadata_repository.fluxes() == flux_ids
|
assert metadata_repository.tables() == table_ids
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_table_modification(location, metadata_repository):
|
||||||
|
table_id = "my_table"
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_table(table_id)
|
||||||
|
|
||||||
|
metadata_repository.register_table_modification(
|
||||||
|
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
|
||||||
|
id=table_id
|
||||||
|
)
|
||||||
|
with open(metadata_filepath, "r") as csvfile:
|
||||||
|
content = csvfile.read()
|
||||||
|
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
|
||||||
|
|
||||||
|
|
||||||
|
# def test_register_and_get_logs(metadata_repository):
|
||||||
|
# table_id = "my_table"
|
||||||
|
# flux_id = "my_flux"
|
||||||
|
# metadata_repository.add_table(table_id)
|
||||||
|
#
|
||||||
|
# metadata_repository.register_table_modification(
|
||||||
|
# table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
||||||
|
# )
|
||||||
|
# metadata_repository.register_table_modification(
|
||||||
|
# table_id, datetime(2024, 3, 15, 14, 30), flux_id
|
||||||
|
# )
|
||||||
|
#
|
||||||
|
# logs = metadata_repository.table_logs(table_id)
|
||||||
|
# assert logs == [
|
||||||
|
# ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_is=flux_id),
|
||||||
|
# ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_is=flux_id),
|
||||||
|
# ]
|
||||||
|
|
||||||
|
|
||||||
|
# def test_register_and_get_last_log(metadata_repository):
|
||||||
|
# table_id = "my_table"
|
||||||
|
# metadata_repository.add_table(table_id)
|
||||||
|
#
|
||||||
|
# metadata_repository.register_table_modification(
|
||||||
|
# table_id,
|
||||||
|
# datetime(2023, 3, 15, 14, 30),
|
||||||
|
# output={"truc": "machin"},
|
||||||
|
# )
|
||||||
|
# metadata_repository.register_table_modification(
|
||||||
|
# table_id,
|
||||||
|
# datetime(2024, 3, 15, 14, 30),
|
||||||
|
# output={
|
||||||
|
# "truc": "chose",
|
||||||
|
# },
|
||||||
|
# )
|
||||||
|
#
|
||||||
|
# logs = metadata_repository.table(table_id)
|
||||||
|
# assert logs == modificationLog(
|
||||||
|
# datetime=datetime(2024, 3, 15, 14, 30),
|
||||||
|
# output=TableMetaData(data={"truc": "chose"}),
|
||||||
|
# )
|
||||||
|
Loading…
Reference in New Issue
Block a user