From d7716a4b8efb15af5bd4b976fddf9e1459aa0459 Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Sun, 19 Jan 2025 06:47:16 +0100 Subject: [PATCH] Feat: add table logs retreiving --- .../fs_metadata_repository.py | 41 +++++----- tests/storage/test_fs_metadata_repository.py | 74 +++++++++---------- 2 files changed, 57 insertions(+), 58 deletions(-) diff --git a/plesna/storage/metadata_repository/fs_metadata_repository.py b/plesna/storage/metadata_repository/fs_metadata_repository.py index 441b3ad..a08b23e 100644 --- a/plesna/storage/metadata_repository/fs_metadata_repository.py +++ b/plesna/storage/metadata_repository/fs_metadata_repository.py @@ -2,6 +2,7 @@ from pathlib import Path from datetime import datetime import csv import json +from typing import Iterable from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern from plesna.storage.metadata_repository.metadata_repository import ( @@ -99,27 +100,33 @@ class FSMetaDataRepository(MetaDataRepository): """Get the table metadata""" return self._register_things_event("table", table_id, dt, {"flux_id": flux_id}) + def _get_all_log(self, what: str, id: str) -> Iterable[dict]: + """Generate log dict from history""" + filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id) + if not filepath.exists: + raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.") + with open(filepath, "r") as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + yield row + + def flux_logs(self, flux_id: str) -> list[ExecutionLog]: + """Get all flux logs""" + logs = [] + for logline in self._get_all_log("flux", flux_id): + logline["output"] = json.loads(logline["output"]) + logs.append(self.OBJECTS["flux"]["logmodel"](**logline)) + + return logs + def flux(self, flux_id: str) -> ExecutionLog: """Get the last flux log""" return max(self.flux_logs(flux_id), key=lambda l: l.datetime) - def flux_logs(self, flux_id: str) -> list[ExecutionLog]: - """Get all flux logs""" - filepath = self._basepath / self.OBJECTS["flux"]["filemodel"].format(id=flux_id) - if not filepath.exists: - raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.") - with open(filepath, "r") as csvfile: - logs = [] - reader = csv.DictReader(csvfile) - for row in reader: - row["output"] = json.loads(row["output"]) - logs.append(ExecutionLog(**row)) - return logs + def table_logs(self, table_id: str) -> list[ModificationLog]: + """Get all table's modification metadatas""" + return [ModificationLog(**log) for log in self._get_all_log("table", table_id)] def table(self, table_id: str) -> ModificationLog: """Get the last table's modification metadatas""" - raise NotImplementedError - - def table_logs(self, table_id: str) -> list[ModificationLog]: - """Get all table's modification metadatas""" - raise NotImplementedError + return max(self.table_logs(table_id), key=lambda l: l.datetime) diff --git a/tests/storage/test_fs_metadata_repository.py b/tests/storage/test_fs_metadata_repository.py index f752102..acc9130 100644 --- a/tests/storage/test_fs_metadata_repository.py +++ b/tests/storage/test_fs_metadata_repository.py @@ -69,7 +69,7 @@ def test_register_flux_execution(location, metadata_repository): ) -def test_register_and_get_logs(metadata_repository): +def test_register_and_get_exec_logs(metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) @@ -99,7 +99,7 @@ def test_register_and_get_logs(metadata_repository): ] -def test_register_and_get_last_log(metadata_repository): +def test_register_and_get_last_exec_log(metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) @@ -147,44 +147,36 @@ def test_register_table_modification(location, metadata_repository): assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n" -# def test_register_and_get_logs(metadata_repository): -# table_id = "my_table" -# flux_id = "my_flux" -# metadata_repository.add_table(table_id) -# -# metadata_repository.register_table_modification( -# table_id, datetime(2023, 3, 15, 14, 30), flux_id -# ) -# metadata_repository.register_table_modification( -# table_id, datetime(2024, 3, 15, 14, 30), flux_id -# ) -# -# logs = metadata_repository.table_logs(table_id) -# assert logs == [ -# ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_is=flux_id), -# ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_is=flux_id), -# ] +def test_register_and_get_mod_logs(metadata_repository): + table_id = "my_table" + flux_id = "my_flux" + metadata_repository.add_table(table_id) + + metadata_repository.register_table_modification( + table_id, datetime(2023, 3, 15, 14, 30), flux_id + ) + metadata_repository.register_table_modification( + table_id, datetime(2024, 3, 15, 14, 30), flux_id + ) + + logs = metadata_repository.table_logs(table_id) + assert logs == [ + ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id), + ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id), + ] -# def test_register_and_get_last_log(metadata_repository): -# table_id = "my_table" -# metadata_repository.add_table(table_id) -# -# metadata_repository.register_table_modification( -# table_id, -# datetime(2023, 3, 15, 14, 30), -# output={"truc": "machin"}, -# ) -# metadata_repository.register_table_modification( -# table_id, -# datetime(2024, 3, 15, 14, 30), -# output={ -# "truc": "chose", -# }, -# ) -# -# logs = metadata_repository.table(table_id) -# assert logs == modificationLog( -# datetime=datetime(2024, 3, 15, 14, 30), -# output=TableMetaData(data={"truc": "chose"}), -# ) +def test_register_and_get_last_log(metadata_repository): + table_id = "my_table" + flux_id = "my_flux" + metadata_repository.add_table(table_id) + + metadata_repository.register_table_modification( + table_id, datetime(2023, 3, 15, 14, 30), flux_id + ) + metadata_repository.register_table_modification( + table_id, datetime(2024, 3, 15, 14, 30), flux_id + ) + + logs = metadata_repository.table(table_id) + assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)