183 lines
5.1 KiB
Python
183 lines
5.1 KiB
Python
from datetime import datetime
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from plesna.models.flux import FluxMetaData
|
|
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
|
|
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
|
|
|
|
|
|
@pytest.fixture
|
|
def location(tmp_path):
|
|
catalogpath = tmp_path / "catalog"
|
|
catalogpath.mkdir()
|
|
|
|
return catalogpath
|
|
|
|
|
|
def test_init(location):
|
|
repo = FSMetaDataRepository(location)
|
|
|
|
|
|
@pytest.fixture
|
|
def metadata_repository(location) -> FSMetaDataRepository:
|
|
return FSMetaDataRepository(location)
|
|
|
|
|
|
def test_add_flux(location, metadata_repository):
|
|
flux_id = "my_flux"
|
|
metadata_repository.add_flux(flux_id)
|
|
|
|
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
|
|
id=flux_id
|
|
)
|
|
assert metadata_filepath.exists()
|
|
|
|
with open(metadata_filepath, "r") as csvfile:
|
|
content = csvfile.read()
|
|
assert content == "datetime,output\n"
|
|
|
|
|
|
def test_add_and_list_fluxes(metadata_repository):
|
|
flux_ids = ["my_flux", "flux2", "blahblah"]
|
|
for f in flux_ids:
|
|
metadata_repository.add_flux(f)
|
|
assert metadata_repository.fluxes() == flux_ids
|
|
|
|
|
|
def test_register_flux_execution(location, metadata_repository):
|
|
flux_id = "my_flux"
|
|
metadata_repository.add_flux(flux_id)
|
|
|
|
metadata_repository.register_flux_execution(
|
|
flux_id,
|
|
datetime(2023, 3, 15, 14, 30),
|
|
output={
|
|
"truc": "machin",
|
|
},
|
|
)
|
|
|
|
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
|
|
id=flux_id
|
|
)
|
|
with open(metadata_filepath, "r") as csvfile:
|
|
content = csvfile.read()
|
|
assert (
|
|
content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
|
|
)
|
|
|
|
|
|
def test_register_and_get_exec_logs(metadata_repository):
|
|
flux_id = "my_flux"
|
|
metadata_repository.add_flux(flux_id)
|
|
|
|
metadata_repository.register_flux_execution(
|
|
flux_id,
|
|
datetime(2023, 3, 15, 14, 30),
|
|
output={"truc": "machin"},
|
|
)
|
|
metadata_repository.register_flux_execution(
|
|
flux_id,
|
|
datetime(2024, 3, 15, 14, 30),
|
|
output={
|
|
"truc": "chose",
|
|
},
|
|
)
|
|
|
|
logs = metadata_repository.flux_logs(flux_id)
|
|
assert logs == [
|
|
ExecutionLog(
|
|
datetime=datetime(2023, 3, 15, 14, 30),
|
|
output=FluxMetaData(data={"truc": "machin"}),
|
|
),
|
|
ExecutionLog(
|
|
datetime=datetime(2024, 3, 15, 14, 30),
|
|
output=FluxMetaData(data={"truc": "chose"}),
|
|
),
|
|
]
|
|
|
|
|
|
def test_register_and_get_last_exec_log(metadata_repository):
|
|
flux_id = "my_flux"
|
|
metadata_repository.add_flux(flux_id)
|
|
|
|
metadata_repository.register_flux_execution(
|
|
flux_id,
|
|
datetime(2023, 3, 15, 14, 30),
|
|
output={"truc": "machin"},
|
|
)
|
|
metadata_repository.register_flux_execution(
|
|
flux_id,
|
|
datetime(2024, 3, 15, 14, 30),
|
|
output={
|
|
"truc": "chose",
|
|
},
|
|
)
|
|
|
|
logs = metadata_repository.flux(flux_id)
|
|
assert logs == ExecutionLog(
|
|
datetime=datetime(2024, 3, 15, 14, 30),
|
|
output=FluxMetaData(data={"truc": "chose"}),
|
|
)
|
|
|
|
|
|
def test_add_and_list_tables(metadata_repository):
|
|
table_ids = ["my_table", "table2", "blahblah"]
|
|
for f in table_ids:
|
|
metadata_repository.add_table(f)
|
|
assert metadata_repository.tables() == table_ids
|
|
|
|
|
|
def test_register_table_modification(location, metadata_repository):
|
|
table_id = "my_table"
|
|
flux_id = "my_flux"
|
|
metadata_repository.add_table(table_id)
|
|
|
|
metadata_repository.register_table_modification(
|
|
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
|
)
|
|
|
|
metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
|
|
id=table_id
|
|
)
|
|
with open(metadata_filepath, "r") as csvfile:
|
|
content = csvfile.read()
|
|
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
|
|
|
|
|
|
def test_register_and_get_mod_logs(metadata_repository):
|
|
table_id = "my_table"
|
|
flux_id = "my_flux"
|
|
metadata_repository.add_table(table_id)
|
|
|
|
metadata_repository.register_table_modification(
|
|
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
|
)
|
|
metadata_repository.register_table_modification(
|
|
table_id, datetime(2024, 3, 15, 14, 30), flux_id
|
|
)
|
|
|
|
logs = metadata_repository.table_logs(table_id)
|
|
assert logs == [
|
|
ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
|
|
ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
|
|
]
|
|
|
|
|
|
def test_register_and_get_last_log(metadata_repository):
|
|
table_id = "my_table"
|
|
flux_id = "my_flux"
|
|
metadata_repository.add_table(table_id)
|
|
|
|
metadata_repository.register_table_modification(
|
|
table_id, datetime(2023, 3, 15, 14, 30), flux_id
|
|
)
|
|
metadata_repository.register_table_modification(
|
|
table_id, datetime(2024, 3, 15, 14, 30), flux_id
|
|
)
|
|
|
|
logs = metadata_repository.table(table_id)
|
|
assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)
|