from datetime import datetime import shutil from pathlib import Path import pytest from plesna.models.flux import FluxMetaData from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository from plesna.storage.metadata_repository.metadata_repository import ExecutionLog @pytest.fixture def location(tmp_path): catalogpath = tmp_path / "catalog" catalogpath.mkdir() return catalogpath def test_init(location): repo = FSMetaDataRepository(location) @pytest.fixture def metadata_repository(location) -> FSMetaDataRepository: return FSMetaDataRepository(location) def test_add_flux(location, metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format( flux_id=flux_id ) assert metadata_filepath.exists() with open(metadata_filepath, "r") as csvfile: content = csvfile.read() assert content == "datetime,output\n" def test_register_flux_execution(location, metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) metadata_repository.register_flux_execution( flux_id, datetime(2023, 3, 15, 14, 30), output={ "truc": "machin", }, ) metadata_filepath = location / metadata_repository.FILEMODEL["execution"].format( flux_id=flux_id ) with open(metadata_filepath, "r") as csvfile: content = csvfile.read() assert ( content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n' ) def test_register_and_get_logs(metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) metadata_repository.register_flux_execution( flux_id, datetime(2023, 3, 15, 14, 30), output={"truc": "machin"}, ) metadata_repository.register_flux_execution( flux_id, datetime(2024, 3, 15, 14, 30), output={ "truc": "chose", }, ) logs = metadata_repository.flux_logs(flux_id) assert logs == [ ExecutionLog( datetime=datetime(2023, 3, 15, 14, 30), output=FluxMetaData(data={"truc": "machin"}), ), ExecutionLog( datetime=datetime(2024, 3, 15, 14, 30), output=FluxMetaData(data={"truc": "chose"}), ), ] def test_register_and_get_last_log(metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) metadata_repository.register_flux_execution( flux_id, datetime(2023, 3, 15, 14, 30), output={"truc": "machin"}, ) metadata_repository.register_flux_execution( flux_id, datetime(2024, 3, 15, 14, 30), output={ "truc": "chose", }, ) logs = metadata_repository.flux(flux_id) assert logs == ExecutionLog( datetime=datetime(2024, 3, 15, 14, 30), output=FluxMetaData(data={"truc": "chose"}), )