183 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			183 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from datetime import datetime
 | |
| import shutil
 | |
| from pathlib import Path
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from plesna.models.flux import FluxMetaData
 | |
| from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
 | |
| from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def location(tmp_path):
 | |
|     catalogpath = tmp_path / "catalog"
 | |
|     catalogpath.mkdir()
 | |
| 
 | |
|     return catalogpath
 | |
| 
 | |
| 
 | |
| def test_init(location):
 | |
|     repo = FSMetaDataRepository(location)
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def metadata_repository(location) -> FSMetaDataRepository:
 | |
|     return FSMetaDataRepository(location)
 | |
| 
 | |
| 
 | |
| def test_add_flux(location, metadata_repository):
 | |
|     flux_id = "my_flux"
 | |
|     metadata_repository.add_flux(flux_id)
 | |
| 
 | |
|     metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
 | |
|         id=flux_id
 | |
|     )
 | |
|     assert metadata_filepath.exists()
 | |
| 
 | |
|     with open(metadata_filepath, "r") as csvfile:
 | |
|         content = csvfile.read()
 | |
|         assert content == "datetime,output\n"
 | |
| 
 | |
| 
 | |
| def test_add_and_list_fluxes(metadata_repository):
 | |
|     flux_ids = ["my_flux", "flux2", "blahblah"]
 | |
|     for f in flux_ids:
 | |
|         metadata_repository.add_flux(f)
 | |
|     assert metadata_repository.fluxes() == flux_ids
 | |
| 
 | |
| 
 | |
| def test_register_flux_execution(location, metadata_repository):
 | |
|     flux_id = "my_flux"
 | |
|     metadata_repository.add_flux(flux_id)
 | |
| 
 | |
|     metadata_repository.register_flux_execution(
 | |
|         flux_id,
 | |
|         datetime(2023, 3, 15, 14, 30),
 | |
|         output={
 | |
|             "truc": "machin",
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
 | |
|         id=flux_id
 | |
|     )
 | |
|     with open(metadata_filepath, "r") as csvfile:
 | |
|         content = csvfile.read()
 | |
|         assert (
 | |
|             content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
 | |
|         )
 | |
| 
 | |
| 
 | |
| def test_register_and_get_exec_logs(metadata_repository):
 | |
|     flux_id = "my_flux"
 | |
|     metadata_repository.add_flux(flux_id)
 | |
| 
 | |
|     metadata_repository.register_flux_execution(
 | |
|         flux_id,
 | |
|         datetime(2023, 3, 15, 14, 30),
 | |
|         output={"truc": "machin"},
 | |
|     )
 | |
|     metadata_repository.register_flux_execution(
 | |
|         flux_id,
 | |
|         datetime(2024, 3, 15, 14, 30),
 | |
|         output={
 | |
|             "truc": "chose",
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     logs = metadata_repository.flux_logs(flux_id)
 | |
|     assert logs == [
 | |
|         ExecutionLog(
 | |
|             datetime=datetime(2023, 3, 15, 14, 30),
 | |
|             output=FluxMetaData(data={"truc": "machin"}),
 | |
|         ),
 | |
|         ExecutionLog(
 | |
|             datetime=datetime(2024, 3, 15, 14, 30),
 | |
|             output=FluxMetaData(data={"truc": "chose"}),
 | |
|         ),
 | |
|     ]
 | |
| 
 | |
| 
 | |
| def test_register_and_get_last_exec_log(metadata_repository):
 | |
|     flux_id = "my_flux"
 | |
|     metadata_repository.add_flux(flux_id)
 | |
| 
 | |
|     metadata_repository.register_flux_execution(
 | |
|         flux_id,
 | |
|         datetime(2023, 3, 15, 14, 30),
 | |
|         output={"truc": "machin"},
 | |
|     )
 | |
|     metadata_repository.register_flux_execution(
 | |
|         flux_id,
 | |
|         datetime(2024, 3, 15, 14, 30),
 | |
|         output={
 | |
|             "truc": "chose",
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     logs = metadata_repository.flux(flux_id)
 | |
|     assert logs == ExecutionLog(
 | |
|         datetime=datetime(2024, 3, 15, 14, 30),
 | |
|         output=FluxMetaData(data={"truc": "chose"}),
 | |
|     )
 | |
| 
 | |
| 
 | |
| def test_add_and_list_tables(metadata_repository):
 | |
|     table_ids = ["my_table", "table2", "blahblah"]
 | |
|     for f in table_ids:
 | |
|         metadata_repository.add_table(f)
 | |
|     assert metadata_repository.tables() == table_ids
 | |
| 
 | |
| 
 | |
| def test_register_table_modification(location, metadata_repository):
 | |
|     table_id = "my_table"
 | |
|     flux_id = "my_flux"
 | |
|     metadata_repository.add_table(table_id)
 | |
| 
 | |
|     metadata_repository.register_table_modification(
 | |
|         table_id, datetime(2023, 3, 15, 14, 30), flux_id
 | |
|     )
 | |
| 
 | |
|     metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
 | |
|         id=table_id
 | |
|     )
 | |
|     with open(metadata_filepath, "r") as csvfile:
 | |
|         content = csvfile.read()
 | |
|         assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
 | |
| 
 | |
| 
 | |
| def test_register_and_get_mod_logs(metadata_repository):
 | |
|     table_id = "my_table"
 | |
|     flux_id = "my_flux"
 | |
|     metadata_repository.add_table(table_id)
 | |
| 
 | |
|     metadata_repository.register_table_modification(
 | |
|         table_id, datetime(2023, 3, 15, 14, 30), flux_id
 | |
|     )
 | |
|     metadata_repository.register_table_modification(
 | |
|         table_id, datetime(2024, 3, 15, 14, 30), flux_id
 | |
|     )
 | |
| 
 | |
|     logs = metadata_repository.table_logs(table_id)
 | |
|     assert logs == [
 | |
|         ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
 | |
|         ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
 | |
|     ]
 | |
| 
 | |
| 
 | |
| def test_register_and_get_last_log(metadata_repository):
 | |
|     table_id = "my_table"
 | |
|     flux_id = "my_flux"
 | |
|     metadata_repository.add_table(table_id)
 | |
| 
 | |
|     metadata_repository.register_table_modification(
 | |
|         table_id, datetime(2023, 3, 15, 14, 30), flux_id
 | |
|     )
 | |
|     metadata_repository.register_table_modification(
 | |
|         table_id, datetime(2024, 3, 15, 14, 30), flux_id
 | |
|     )
 | |
| 
 | |
|     logs = metadata_repository.table(table_id)
 | |
|     assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)
 |