Feat: read and write flux logs

This commit is contained in:
Bertrand Benjamin 2025-01-15 17:48:44 +01:00
parent 543b3fe98e
commit 8623cd5960
3 changed files with 43 additions and 5 deletions

View File

@ -1,5 +1,6 @@
from pathlib import Path from pathlib import Path
import csv import csv
import json
from plesna.storage.metadata_repository.metadata_repository import ( from plesna.storage.metadata_repository.metadata_repository import (
ExecutionMetaData, ExecutionMetaData,
@ -37,7 +38,7 @@ class FSMetaDataRepository(MetaDataRepository):
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id) filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
filepath.touch() filepath.touch()
with open(filepath, "a") as csvfile: with open(filepath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys()) writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.model_fields.keys())
writer.writeheader() writer.writeheader()
return flux_id return flux_id
@ -49,7 +50,7 @@ class FSMetaDataRepository(MetaDataRepository):
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
with open(filepath, "a") as csvfile: with open(filepath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys()) writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.model_fields.keys())
writer.writerow(metadata_.to_flat_dict()) writer.writerow(metadata_.to_flat_dict())
return metadata_ return metadata_
@ -65,8 +66,9 @@ class FSMetaDataRepository(MetaDataRepository):
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
with open(filepath, "r") as csvfile: with open(filepath, "r") as csvfile:
logs = [] logs = []
reader = csv.DictReader(csvfile, fieldnames=ExecutionMetaData.__fields__.keys()) reader = csv.DictReader(csvfile)
for row in reader: for row in reader:
row["output"] = json.loads(row["output"])
logs.append(ExecutionMetaData(**row)) logs.append(ExecutionMetaData(**row))
return logs return logs

View File

@ -16,7 +16,7 @@ class ExecutionMetaData(BaseModel):
output: FluxMetaData output: FluxMetaData
def to_flat_dict(self): def to_flat_dict(self):
return {"datetime": self.datetime.timestamp(), "output": self.output.model_dump_json()} return {"datetime": self.datetime.isoformat(), "output": self.output.model_dump_json()}
class MetaDataRepository: class MetaDataRepository:

View File

@ -4,7 +4,9 @@ from pathlib import Path
import pytest import pytest
from plesna.models.flux import FluxMetaData
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
from plesna.storage.metadata_repository.metadata_repository import ExecutionMetaData
@pytest.fixture @pytest.fixture
@ -51,4 +53,38 @@ def test_register_flux_execution(location, metadata_repository):
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id) metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
with open(metadata_filepath, "r") as csvfile: with open(metadata_filepath, "r") as csvfile:
content = csvfile.read() content = csvfile.read()
assert content == 'datetime,output\n1678887000.0,"{""data"":{""truc"":""machin""}}"\n' assert (
content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
)
def test_register_and_get_flux_execution(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
metadata={
"datetime": datetime(2023, 3, 15, 14, 30),
"output": {"data": {"truc": "machin"}},
},
)
metadata_repository.register_flux_execution(
flux_id,
metadata={
"datetime": datetime(2024, 3, 15, 14, 30),
"output": {"data": {"truc": "chose"}},
},
)
logs = metadata_repository.flux_logs(flux_id)
assert logs == [
ExecutionMetaData(
datetime=datetime(2023, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "machin"}),
),
ExecutionMetaData(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
),
]