Feat: add listing fluxes

This commit is contained in:
Bertrand Benjamin 2025-01-17 06:29:54 +01:00
parent 2a387a1bc8
commit 8882317a47
2 changed files with 17 additions and 1 deletions

View File

@ -3,6 +3,7 @@ from datetime import datetime
import csv
import json
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
from plesna.storage.metadata_repository.metadata_repository import (
ExecutionLog,
MetaDataRepository,
@ -34,7 +35,15 @@ class FSMetaDataRepository(MetaDataRepository):
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
raise NotImplementedError
fluxes = []
for filepath in self._basepath.iterdir():
try:
founded = extract_values_from_pattern(self.FILEMODEL["execution"], filepath.name)
except StringToolsError:
pass
else:
fluxes.append(founded["flux_id"])
return fluxes
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""

View File

@ -114,3 +114,10 @@ def test_register_and_get_last_log(metadata_repository):
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
)
def test_register_and_list_fluxes(metadata_repository):
flux_ids = ["my_flux", "flux2", "blahblah"]
for f in flux_ids:
metadata_repository.add_flux(f)
assert metadata_repository.fluxes() == flux_ids