refact: rename metadata to log

This commit is contained in:
Bertrand Benjamin 2025-01-15 18:01:30 +01:00
parent 8623cd5960
commit eec3a13dbb
3 changed files with 30 additions and 31 deletions

View File

@ -1,11 +1,12 @@
from pathlib import Path from pathlib import Path
from datetime import datetime
import csv import csv
import json import json
from plesna.storage.metadata_repository.metadata_repository import ( from plesna.storage.metadata_repository.metadata_repository import (
ExecutionMetaData, ExecutionLog,
MetaDataRepository, MetaDataRepository,
ModificationMetaData, ModificationLog,
) )
@ -38,28 +39,28 @@ class FSMetaDataRepository(MetaDataRepository):
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
filepath.touch() filepath.touch()
with open(filepath, "a") as csvfile: with open(filepath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.model_fields.keys()) writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
writer.writeheader() writer.writeheader()
return flux_id return flux_id
def register_flux_execution(self, flux_id: str, metadata: dict) -> ExecutionMetaData: def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
"""Get the flux metadata""" """Get the flux metadata"""
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
metadata_ = ExecutionMetaData(**metadata) metadata_ = ExecutionLog(datetime=dt, output={"data": output})
if not filepath.exists: if not filepath.exists:
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
with open(filepath, "a") as csvfile: with open(filepath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.model_fields.keys()) writer = csv.DictWriter(csvfile, fieldnames=ExecutionLog.model_fields.keys())
writer.writerow(metadata_.to_flat_dict()) writer.writerow(metadata_.to_flat_dict())
return metadata_ return metadata_
def flux(self, flux_id: str) -> ExecutionMetaData: def flux(self, flux_id: str) -> ExecutionLog:
"""Get the flux metadata""" """Get the flux metadata"""
raise NotImplementedError raise NotImplementedError
def flux_logs(self, flux_id: str) -> list[ExecutionMetaData]: def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
"""Get the flux metadata""" """Get the flux metadata"""
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
if not filepath.exists: if not filepath.exists:
@ -69,13 +70,13 @@ class FSMetaDataRepository(MetaDataRepository):
reader = csv.DictReader(csvfile) reader = csv.DictReader(csvfile)
for row in reader: for row in reader:
row["output"] = json.loads(row["output"]) row["output"] = json.loads(row["output"])
logs.append(ExecutionMetaData(**row)) logs.append(ExecutionLog(**row))
return logs return logs
def tables(self) -> list[str]: def tables(self) -> list[str]:
"""List table's name in schema (the id)""" """List table's name in schema (the id)"""
raise NotImplementedError raise NotImplementedError
def table(self, table_id: str) -> ModificationMetaData: def table(self, table_id: str) -> ModificationLog:
"""Get table's metadatas""" """Get table's metadatas"""
raise NotImplementedError raise NotImplementedError

View File

@ -6,12 +6,12 @@ from pydantic import BaseModel
from plesna.models.flux import FluxMetaData from plesna.models.flux import FluxMetaData
class ModificationMetaData(BaseModel): class ModificationLog(BaseModel):
datetime: datetime datetime: datetime
flux_id: str flux_id: str
class ExecutionMetaData(BaseModel): class ExecutionLog(BaseModel):
datetime: datetime datetime: datetime
output: FluxMetaData output: FluxMetaData
@ -36,17 +36,17 @@ class MetaDataRepository:
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def register_flux_execution(self, flux_id: str, metadata: ExecutionMetaData) -> str: def register_flux_execution(self, flux_id: str, dt: datetime, metadata: dict) -> str:
"""Get the flux metadata""" """Get the flux metadata"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def flux(self, schema_id: str) -> ExecutionMetaData: def flux(self, schema_id: str) -> ExecutionLog:
"""Get the flux last execution metadata""" """Get the flux last execution metadata"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def flux_logs(self, schema_id: str) -> list[ExecutionMetaData]: def flux_logs(self, schema_id: str) -> list[ExecutionLog]:
"""Get all the flux execution metadata""" """Get all the flux execution metadata"""
raise NotImplementedError raise NotImplementedError
@ -63,16 +63,16 @@ class MetaDataRepository:
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def register_table_modification(self, table_id: str, metadata: ModificationMetaData) -> str: def register_table_modification(self, table_id: str, dt: datetime, metadata: dict) -> str:
"""Get the table metadata""" """Get the table metadata"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def table(self, table_id: str) -> ModificationMetaData: def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas""" """Get the last table's modification metadatas"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def table_logs(self, table_id: str) -> list[ModificationMetaData]: def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas""" """Get all table's modification metadatas"""
raise NotImplementedError raise NotImplementedError

View File

@ -6,7 +6,7 @@ import pytest
from plesna.models.flux import FluxMetaData from plesna.models.flux import FluxMetaData
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
from plesna.storage.metadata_repository.metadata_repository import ExecutionMetaData from plesna.storage.metadata_repository.metadata_repository import ExecutionLog
@pytest.fixture @pytest.fixture
@ -44,9 +44,9 @@ def test_register_flux_execution(location, metadata_repository):
metadata_repository.register_flux_execution( metadata_repository.register_flux_execution(
flux_id, flux_id,
metadata={ datetime(2023, 3, 15, 14, 30),
"datetime": datetime(2023, 3, 15, 14, 30), output={
"output": {"data": {"truc": "machin"}}, "truc": "machin",
}, },
) )
@ -64,26 +64,24 @@ def test_register_and_get_flux_execution(location, metadata_repository):
metadata_repository.register_flux_execution( metadata_repository.register_flux_execution(
flux_id, flux_id,
metadata={ datetime(2023, 3, 15, 14, 30),
"datetime": datetime(2023, 3, 15, 14, 30), output={"truc": "machin"},
"output": {"data": {"truc": "machin"}},
},
) )
metadata_repository.register_flux_execution( metadata_repository.register_flux_execution(
flux_id, flux_id,
metadata={ datetime(2024, 3, 15, 14, 30),
"datetime": datetime(2024, 3, 15, 14, 30), output={
"output": {"data": {"truc": "chose"}}, "truc": "chose",
}, },
) )
logs = metadata_repository.flux_logs(flux_id) logs = metadata_repository.flux_logs(flux_id)
assert logs == [ assert logs == [
ExecutionMetaData( ExecutionLog(
datetime=datetime(2023, 3, 15, 14, 30), datetime=datetime(2023, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "machin"}), output=FluxMetaData(data={"truc": "machin"}),
), ),
ExecutionMetaData( ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30), datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}), output=FluxMetaData(data={"truc": "chose"}),
), ),