Feat: add table logs retreiving

This commit is contained in:
Bertrand Benjamin 2025-01-19 06:47:16 +01:00
parent 478a8c2403
commit d7716a4b8e
2 changed files with 57 additions and 58 deletions

View File

@ -2,6 +2,7 @@ from pathlib import Path
from datetime import datetime
import csv
import json
from typing import Iterable
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
from plesna.storage.metadata_repository.metadata_repository import (
@ -99,27 +100,33 @@ class FSMetaDataRepository(MetaDataRepository):
"""Get the table metadata"""
return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
def _get_all_log(self, what: str, id: str) -> Iterable[dict]:
"""Generate log dict from history"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
with open(filepath, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
"""Get all flux logs"""
logs = []
for logline in self._get_all_log("flux", flux_id):
logline["output"] = json.loads(logline["output"])
logs.append(self.OBJECTS["flux"]["logmodel"](**logline))
return logs
def flux(self, flux_id: str) -> ExecutionLog:
"""Get the last flux log"""
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
"""Get all flux logs"""
filepath = self._basepath / self.OBJECTS["flux"]["filemodel"].format(id=flux_id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
with open(filepath, "r") as csvfile:
logs = []
reader = csv.DictReader(csvfile)
for row in reader:
row["output"] = json.loads(row["output"])
logs.append(ExecutionLog(**row))
return logs
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
return [ModificationLog(**log) for log in self._get_all_log("table", table_id)]
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
raise NotImplementedError
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
raise NotImplementedError
return max(self.table_logs(table_id), key=lambda l: l.datetime)

View File

@ -69,7 +69,7 @@ def test_register_flux_execution(location, metadata_repository):
)
def test_register_and_get_logs(metadata_repository):
def test_register_and_get_exec_logs(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
@ -99,7 +99,7 @@ def test_register_and_get_logs(metadata_repository):
]
def test_register_and_get_last_log(metadata_repository):
def test_register_and_get_last_exec_log(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
@ -147,44 +147,36 @@ def test_register_table_modification(location, metadata_repository):
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
# def test_register_and_get_logs(metadata_repository):
# table_id = "my_table"
# flux_id = "my_flux"
# metadata_repository.add_table(table_id)
#
# metadata_repository.register_table_modification(
# table_id, datetime(2023, 3, 15, 14, 30), flux_id
# )
# metadata_repository.register_table_modification(
# table_id, datetime(2024, 3, 15, 14, 30), flux_id
# )
#
# logs = metadata_repository.table_logs(table_id)
# assert logs == [
# ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_is=flux_id),
# ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_is=flux_id),
# ]
def test_register_and_get_mod_logs(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table_logs(table_id)
assert logs == [
ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
]
# def test_register_and_get_last_log(metadata_repository):
# table_id = "my_table"
# metadata_repository.add_table(table_id)
#
# metadata_repository.register_table_modification(
# table_id,
# datetime(2023, 3, 15, 14, 30),
# output={"truc": "machin"},
# )
# metadata_repository.register_table_modification(
# table_id,
# datetime(2024, 3, 15, 14, 30),
# output={
# "truc": "chose",
# },
# )
#
# logs = metadata_repository.table(table_id)
# assert logs == modificationLog(
# datetime=datetime(2024, 3, 15, 14, 30),
# output=TableMetaData(data={"truc": "chose"}),
# )
def test_register_and_get_last_log(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table(table_id)
assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)