Compare commits

...

3 Commits

3 changed files with 101 additions and 34 deletions

View File

@ -1,10 +1,12 @@
from pathlib import Path
from datetime import datetime
import csv
import json
from plesna.storage.metadata_repository.metadata_repository import (
ExecutionMetaData,
ExecutionLog,
MetaDataRepository,
ModificationMetaData,
ModificationLog,
)
@ -19,8 +21,10 @@ class FSMetaDataRepository(MetaDataRepository):
"""
EXECUTION_FILEMODEL = "{flux_id}_execution.csv"
MODIFICATION_FILEMODEL = "{table_id}_modification.csv"
FILEMODEL = {
"execution": "{flux_id}_execution.csv",
"modification": "{table_id}_modification.csv",
}
def __init__(self, basepath: str):
super().__init__()
@ -34,46 +38,47 @@ class FSMetaDataRepository(MetaDataRepository):
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
filepath.touch()
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
writer.writeheader()
return flux_id
def register_flux_execution(self, flux_id: str, metadata: dict) -> ExecutionMetaData:
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
"""Get the flux metadata"""
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
metadata_ = ExecutionMetaData(**metadata)
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
metadata_ = ExecutionLog(datetime=dt, output={"data": output})
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
writer.writerow(metadata_.to_flat_dict())
return metadata_
def flux(self, flux_id: str) -> ExecutionMetaData:
"""Get the flux metadata"""
raise NotImplementedError
def flux(self, flux_id: str) -> ExecutionLog:
"""Get the last flux log"""
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
def flux_logs(self, flux_id: str) -> list[ExecutionMetaData]:
"""Get the flux metadata"""
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
"""Get all flux logs"""
filepath = self._basepath / self.FILEMODEL["execution"].format(flux_id=flux_id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
with open(filepath, "r") as csvfile:
logs = []
reader = csv.DictReader(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
reader = csv.DictReader(csvfile)
for row in reader:
logs.append(ExecutionMetaData(**row))
row["output"] = json.loads(row["output"])
logs.append(ExecutionLog(**row))
return logs
def tables(self) -> list[str]:
"""List table's name in schema (the id)"""
raise NotImplementedError
def table(self, table_id: str) -> ModificationMetaData:
def table(self, table_id: str) -> ModificationLog:
"""Get table's metadatas"""
raise NotImplementedError

View File

@ -6,17 +6,17 @@ from pydantic import BaseModel
from plesna.models.flux import FluxMetaData
class ModificationMetaData(BaseModel):
class ModificationLog(BaseModel):
datetime: datetime
flux_id: str
class ExecutionMetaData(BaseModel):
class ExecutionLog(BaseModel):
datetime: datetime
output: FluxMetaData
def to_flat_dict(self):
return {"datetime": self.datetime.timestamp(), "output": self.output.model_dump_json()}
return {"datetime": self.datetime.isoformat(), "output": self.output.model_dump_json()}
class MetaDataRepository:
@ -36,17 +36,17 @@ class MetaDataRepository:
raise NotImplementedError
@abc.abstractmethod
def register_flux_execution(self, flux_id: str, metadata: ExecutionMetaData) -> str:
def register_flux_execution(self, flux_id: str, dt: datetime, metadata: dict) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux(self, schema_id: str) -> ExecutionMetaData:
def flux(self, schema_id: str) -> ExecutionLog:
"""Get the flux last execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux_logs(self, schema_id: str) -> list[ExecutionMetaData]:
def flux_logs(self, schema_id: str) -> list[ExecutionLog]:
"""Get all the flux execution metadata"""
raise NotImplementedError
@ -63,16 +63,16 @@ class MetaDataRepository:
raise NotImplementedError
@abc.abstractmethod
def register_table_modification(self, table_id: str, metadata: ModificationMetaData) -> str:
def register_table_modification(self, table_id: str, dt: datetime, metadata: dict) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def table(self, table_id: str) -> ModificationMetaData:
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
raise NotImplementedError
@abc.abstractmethod
def table_logs(self, table_id: str) -> list[ModificationMetaData]:
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
raise NotImplementedError

View File

@ -4,7 +4,9 @@ from pathlib import Path
import pytest
from plesna.models.flux import FluxMetaData
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog
@pytest.fixture
@ -28,7 +30,9 @@ def test_add_flux(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format(
flux_id=flux_id
)
assert metadata_filepath.exists()
with open(metadata_filepath, "r") as csvfile:
@ -42,13 +46,71 @@ def test_register_flux_execution(location, metadata_repository):
metadata_repository.register_flux_execution(
flux_id,
metadata={
"datetime": datetime(2023, 3, 15, 14, 30),
"output": {"data": {"truc": "machin"}},
datetime(2023, 3, 15, 14, 30),
output={
"truc": "machin",
},
)
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format(
flux_id=flux_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == 'datetime,output\n1678887000.0,"{""data"":{""truc"":""machin""}}"\n'
assert (
content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
)
def test_register_and_get_logs(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux_logs(flux_id)
assert logs == [
ExecutionLog(
datetime=datetime(2023, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "machin"}),
),
ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
),
]
def test_register_and_get_last_log(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux(flux_id)
assert logs == ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
)