Feat: start working on metadata_repository
This commit is contained in:
parent
1a49158afa
commit
543b3fe98e
0
plesna/storage/metadata_repository/__init__.py
Normal file
0
plesna/storage/metadata_repository/__init__.py
Normal file
79
plesna/storage/metadata_repository/fs_metadata_repository.py
Normal file
79
plesna/storage/metadata_repository/fs_metadata_repository.py
Normal file
@ -0,0 +1,79 @@
|
||||
from pathlib import Path
|
||||
import csv
|
||||
|
||||
from plesna.storage.metadata_repository.metadata_repository import (
|
||||
ExecutionMetaData,
|
||||
MetaDataRepository,
|
||||
ModificationMetaData,
|
||||
)
|
||||
|
||||
|
||||
class FSMetaDataRepositoryError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class FSMetaDataRepository(MetaDataRepository):
|
||||
"""MetaData Repository based on csv files
|
||||
|
||||
Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL
|
||||
|
||||
"""
|
||||
|
||||
EXECUTION_FILEMODEL = "{flux_id}_execution.csv"
|
||||
MODIFICATION_FILEMODEL = "{table_id}_modification.csv"
|
||||
|
||||
def __init__(self, basepath: str):
|
||||
super().__init__()
|
||||
|
||||
self._basepath = Path(basepath)
|
||||
assert self._basepath.exists()
|
||||
|
||||
def fluxes(self) -> list[str]:
|
||||
"""List fluxes's ids"""
|
||||
raise NotImplementedError
|
||||
|
||||
def add_flux(self, flux_id: str) -> str:
|
||||
"""Get the flux metadata"""
|
||||
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||
filepath.touch()
|
||||
with open(filepath, "a") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
|
||||
writer.writeheader()
|
||||
return flux_id
|
||||
|
||||
def register_flux_execution(self, flux_id: str, metadata: dict) -> ExecutionMetaData:
|
||||
"""Get the flux metadata"""
|
||||
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||
metadata_ = ExecutionMetaData(**metadata)
|
||||
if not filepath.exists:
|
||||
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
||||
|
||||
with open(filepath, "a") as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
|
||||
writer.writerow(metadata_.to_flat_dict())
|
||||
|
||||
return metadata_
|
||||
|
||||
def flux(self, flux_id: str) -> ExecutionMetaData:
|
||||
"""Get the flux metadata"""
|
||||
raise NotImplementedError
|
||||
|
||||
def flux_logs(self, flux_id: str) -> list[ExecutionMetaData]:
|
||||
"""Get the flux metadata"""
|
||||
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||
if not filepath.exists:
|
||||
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
||||
with open(filepath, "r") as csvfile:
|
||||
logs = []
|
||||
reader = csv.DictReader(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
|
||||
for row in reader:
|
||||
logs.append(ExecutionMetaData(**row))
|
||||
return logs
|
||||
|
||||
def tables(self) -> list[str]:
|
||||
"""List table's name in schema (the id)"""
|
||||
raise NotImplementedError
|
||||
|
||||
def table(self, table_id: str) -> ModificationMetaData:
|
||||
"""Get table's metadatas"""
|
||||
raise NotImplementedError
|
78
plesna/storage/metadata_repository/metadata_repository.py
Normal file
78
plesna/storage/metadata_repository/metadata_repository.py
Normal file
@ -0,0 +1,78 @@
|
||||
import abc
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from plesna.models.flux import FluxMetaData
|
||||
|
||||
|
||||
class ModificationMetaData(BaseModel):
|
||||
datetime: datetime
|
||||
flux_id: str
|
||||
|
||||
|
||||
class ExecutionMetaData(BaseModel):
|
||||
datetime: datetime
|
||||
output: FluxMetaData
|
||||
|
||||
def to_flat_dict(self):
|
||||
return {"datetime": self.datetime.timestamp(), "output": self.output.model_dump_json()}
|
||||
|
||||
|
||||
class MetaDataRepository:
|
||||
"""Object that stores metadata about flux, schema, tables"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def fluxes(self) -> list[str]:
|
||||
"""List fluxes's ids"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_flux(self, flux_id: str) -> str:
|
||||
"""Get the flux metadata"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def register_flux_execution(self, flux_id: str, metadata: ExecutionMetaData) -> str:
|
||||
"""Get the flux metadata"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def flux(self, schema_id: str) -> ExecutionMetaData:
|
||||
"""Get the flux last execution metadata"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def flux_logs(self, schema_id: str) -> list[ExecutionMetaData]:
|
||||
"""Get all the flux execution metadata"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def tables(
|
||||
self,
|
||||
) -> list[str]:
|
||||
"""List all table's ids"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_table(self, table_id: str) -> str:
|
||||
"""Get the table metadata"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def register_table_modification(self, table_id: str, metadata: ModificationMetaData) -> str:
|
||||
"""Get the table metadata"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def table(self, table_id: str) -> ModificationMetaData:
|
||||
"""Get the last table's modification metadatas"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def table_logs(self, table_id: str) -> list[ModificationMetaData]:
|
||||
"""Get all table's modification metadatas"""
|
||||
raise NotImplementedError
|
54
tests/storage/test_fs_metadata_repository.py
Normal file
54
tests/storage/test_fs_metadata_repository.py
Normal file
@ -0,0 +1,54 @@
|
||||
from datetime import datetime
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def location(tmp_path):
|
||||
catalogpath = tmp_path / "catalog"
|
||||
catalogpath.mkdir()
|
||||
|
||||
return catalogpath
|
||||
|
||||
|
||||
def test_init(location):
|
||||
repo = FSMetaDataRepository(location)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata_repository(location) -> FSMetaDataRepository:
|
||||
return FSMetaDataRepository(location)
|
||||
|
||||
|
||||
def test_add_flux(location, metadata_repository):
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_flux(flux_id)
|
||||
|
||||
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||
assert metadata_filepath.exists()
|
||||
|
||||
with open(metadata_filepath, "r") as csvfile:
|
||||
content = csvfile.read()
|
||||
assert content == "datetime,output\n"
|
||||
|
||||
|
||||
def test_register_flux_execution(location, metadata_repository):
|
||||
flux_id = "my_flux"
|
||||
metadata_repository.add_flux(flux_id)
|
||||
|
||||
metadata_repository.register_flux_execution(
|
||||
flux_id,
|
||||
metadata={
|
||||
"datetime": datetime(2023, 3, 15, 14, 30),
|
||||
"output": {"data": {"truc": "machin"}},
|
||||
},
|
||||
)
|
||||
|
||||
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||
with open(metadata_filepath, "r") as csvfile:
|
||||
content = csvfile.read()
|
||||
assert content == 'datetime,output\n1678887000.0,"{""data"":{""truc"":""machin""}}"\n'
|
Loading…
Reference in New Issue
Block a user