from datetime import datetime import shutil from pathlib import Path import pytest from plesna.models.flux import FluxMetaData from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog @pytest.fixture def location(tmp_path): catalogpath = tmp_path / "catalog" catalogpath.mkdir() return catalogpath def test_init(location): repo = FSMetaDataRepository(location) @pytest.fixture def metadata_repository(location) -> FSMetaDataRepository: return FSMetaDataRepository(location) def test_add_flux(location, metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format( id=flux_id ) assert metadata_filepath.exists() with open(metadata_filepath, "r") as csvfile: content = csvfile.read() assert content == "datetime,output\n" def test_add_and_list_fluxes(metadata_repository): flux_ids = ["my_flux", "flux2", "blahblah"] for f in flux_ids: metadata_repository.add_flux(f) assert metadata_repository.fluxes() == flux_ids def test_register_flux_execution(location, metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) metadata_repository.register_flux_execution( flux_id, datetime(2023, 3, 15, 14, 30), output={ "truc": "machin", }, ) metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format( id=flux_id ) with open(metadata_filepath, "r") as csvfile: content = csvfile.read() assert ( content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n' ) def test_register_and_get_exec_logs(metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) metadata_repository.register_flux_execution( flux_id, datetime(2023, 3, 15, 14, 30), output={"truc": "machin"}, ) metadata_repository.register_flux_execution( flux_id, datetime(2024, 3, 15, 14, 30), output={ "truc": "chose", }, ) logs = metadata_repository.flux_logs(flux_id) assert logs == [ ExecutionLog( datetime=datetime(2023, 3, 15, 14, 30), output=FluxMetaData(data={"truc": "machin"}), ), ExecutionLog( datetime=datetime(2024, 3, 15, 14, 30), output=FluxMetaData(data={"truc": "chose"}), ), ] def test_register_and_get_last_exec_log(metadata_repository): flux_id = "my_flux" metadata_repository.add_flux(flux_id) metadata_repository.register_flux_execution( flux_id, datetime(2023, 3, 15, 14, 30), output={"truc": "machin"}, ) metadata_repository.register_flux_execution( flux_id, datetime(2024, 3, 15, 14, 30), output={ "truc": "chose", }, ) logs = metadata_repository.flux(flux_id) assert logs == ExecutionLog( datetime=datetime(2024, 3, 15, 14, 30), output=FluxMetaData(data={"truc": "chose"}), ) def test_add_and_list_tables(metadata_repository): table_ids = ["my_table", "table2", "blahblah"] for f in table_ids: metadata_repository.add_table(f) assert metadata_repository.tables() == table_ids def test_register_table_modification(location, metadata_repository): table_id = "my_table" flux_id = "my_flux" metadata_repository.add_table(table_id) metadata_repository.register_table_modification( table_id, datetime(2023, 3, 15, 14, 30), flux_id ) metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format( id=table_id ) with open(metadata_filepath, "r") as csvfile: content = csvfile.read() assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n" def test_register_and_get_mod_logs(metadata_repository): table_id = "my_table" flux_id = "my_flux" metadata_repository.add_table(table_id) metadata_repository.register_table_modification( table_id, datetime(2023, 3, 15, 14, 30), flux_id ) metadata_repository.register_table_modification( table_id, datetime(2024, 3, 15, 14, 30), flux_id ) logs = metadata_repository.table_logs(table_id) assert logs == [ ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id), ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id), ] def test_register_and_get_last_log(metadata_repository): table_id = "my_table" flux_id = "my_flux" metadata_repository.add_table(table_id) metadata_repository.register_table_modification( table_id, datetime(2023, 3, 15, 14, 30), flux_id ) metadata_repository.register_table_modification( table_id, datetime(2024, 3, 15, 14, 30), flux_id ) logs = metadata_repository.table(table_id) assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)