Compare commits

...

2 Commits

10 changed files with 231 additions and 20 deletions

View File

@ -5,7 +5,7 @@ from plesna.graph.graph_set import GraphSet
from plesna.models.flux import Flux, FluxMetaData from plesna.models.flux import Flux, FluxMetaData
from plesna.models.graphs import Node from plesna.models.graphs import Node
from plesna.models.libs.flux_graph import flux_to_edgeonset from plesna.models.libs.flux_graph import flux_to_edgeonset
from plesna.storage.repository.repository import Repository from plesna.storage.data_repository.data_repository import DataRepository
class DataPlateformError(Exception): class DataPlateformError(Exception):
@ -18,7 +18,7 @@ class DataPlateform:
self._fluxes = {} self._fluxes = {}
self._repositories = {} self._repositories = {}
def add_repository(self, repository: Repository) -> str: def add_repository(self, repository: DataRepository) -> str:
if repository.id in self._repositories: if repository.id in self._repositories:
raise DataPlateformError("The repository {repository.id} already exists") raise DataPlateformError("The repository {repository.id} already exists")
@ -29,7 +29,7 @@ class DataPlateform:
def repositories(self) -> list[str]: def repositories(self) -> list[str]:
return list(self._repositories) return list(self._repositories)
def repository(self, id: str) -> Repository: def repository(self, id: str) -> DataRepository:
return self._repositories[id] return self._repositories[id]
def add_flux(self, name: str, flux: Flux) -> str: def add_flux(self, name: str, flux: Flux) -> str:

View File

@ -3,7 +3,7 @@ import abc
from plesna.models.storage import Partition, Schema, Table from plesna.models.storage import Partition, Schema, Table
class Repository: class DataRepository:
def __init__(self, id: str, name: str): def __init__(self, id: str, name: str):
self._id = id self._id = id
self._name = name self._name = name

View File

@ -3,8 +3,8 @@ from pathlib import Path
from pydantic import BaseModel, computed_field from pydantic import BaseModel, computed_field
from plesna.libs.string_tools import extract_values_from_pattern from plesna.libs.string_tools import extract_values_from_pattern
from plesna.models.storage import Partition, Schema, Table from plesna.models.storage import Schema, Table
from plesna.storage.repository.repository import Repository from plesna.storage.data_repository.data_repository import DataRepository
class FSTable(BaseModel): class FSTable(BaseModel):
@ -58,8 +58,8 @@ class FSRepositoryError(ValueError):
pass pass
class FSRepository(Repository): class FSDataRepository(DataRepository):
"""Repository based on files tree structure """Data Repository based on files tree structure
- first level: schemas - first level: schemas
- second level: tables - second level: tables

View File

@ -0,0 +1,79 @@
from pathlib import Path
import csv
from plesna.storage.metadata_repository.metadata_repository import (
ExecutionMetaData,
MetaDataRepository,
ModificationMetaData,
)
class FSMetaDataRepositoryError(ValueError):
pass
class FSMetaDataRepository(MetaDataRepository):
"""MetaData Repository based on csv files
Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL
"""
EXECUTION_FILEMODEL = "{flux_id}_execution.csv"
MODIFICATION_FILEMODEL = "{table_id}_modification.csv"
def __init__(self, basepath: str):
super().__init__()
self._basepath = Path(basepath)
assert self._basepath.exists()
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
raise NotImplementedError
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
filepath.touch()
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
writer.writeheader()
return flux_id
def register_flux_execution(self, flux_id: str, metadata: dict) -> ExecutionMetaData:
"""Get the flux metadata"""
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
metadata_ = ExecutionMetaData(**metadata)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
writer.writerow(metadata_.to_flat_dict())
return metadata_
def flux(self, flux_id: str) -> ExecutionMetaData:
"""Get the flux metadata"""
raise NotImplementedError
def flux_logs(self, flux_id: str) -> list[ExecutionMetaData]:
"""Get the flux metadata"""
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
with open(filepath, "r") as csvfile:
logs = []
reader = csv.DictReader(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
for row in reader:
logs.append(ExecutionMetaData(**row))
return logs
def tables(self) -> list[str]:
"""List table's name in schema (the id)"""
raise NotImplementedError
def table(self, table_id: str) -> ModificationMetaData:
"""Get table's metadatas"""
raise NotImplementedError

View File

@ -0,0 +1,78 @@
import abc
from datetime import datetime
from pydantic import BaseModel
from plesna.models.flux import FluxMetaData
class ModificationMetaData(BaseModel):
datetime: datetime
flux_id: str
class ExecutionMetaData(BaseModel):
datetime: datetime
output: FluxMetaData
def to_flat_dict(self):
return {"datetime": self.datetime.timestamp(), "output": self.output.model_dump_json()}
class MetaDataRepository:
"""Object that stores metadata about flux, schema, tables"""
def __init__(self):
pass
@abc.abstractmethod
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_flux_execution(self, flux_id: str, metadata: ExecutionMetaData) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux(self, schema_id: str) -> ExecutionMetaData:
"""Get the flux last execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux_logs(self, schema_id: str) -> list[ExecutionMetaData]:
"""Get all the flux execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def tables(
self,
) -> list[str]:
"""List all table's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_table_modification(self, table_id: str, metadata: ModificationMetaData) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def table(self, table_id: str) -> ModificationMetaData:
"""Get the last table's modification metadatas"""
raise NotImplementedError
@abc.abstractmethod
def table_logs(self, table_id: str) -> list[ModificationMetaData]:
"""Get all table's modification metadatas"""
raise NotImplementedError

View File

@ -6,13 +6,13 @@ import pytest
from plesna.dataplatform import DataPlateform from plesna.dataplatform import DataPlateform
from plesna.models.graphs import Edge, EdgeOnSet, Node from plesna.models.graphs import Edge, EdgeOnSet, Node
from plesna.models.flux import Flux, Transformation from plesna.models.flux import Flux, Transformation
from plesna.storage.repository.fs_repository import FSRepository from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@pytest.fixture @pytest.fixture
def repository(tmp_path) -> FSRepository: def repository(tmp_path) -> FSDataRepository:
example_src = FIXTURE_DIR example_src = FIXTURE_DIR
assert example_src.exists() assert example_src.exists()
@ -24,11 +24,11 @@ def repository(tmp_path) -> FSRepository:
silver_path = Path(tmp_path) / "silver" silver_path = Path(tmp_path) / "silver"
silver_path.mkdir() silver_path.mkdir()
return FSRepository("test", "test", tmp_path) return FSDataRepository("test", "test", tmp_path)
def test_add_repository( def test_add_repository(
repository: FSRepository, repository: FSDataRepository,
): ):
dp = DataPlateform() dp = DataPlateform()
dp.add_repository(repository) dp.add_repository(repository)
@ -39,7 +39,7 @@ def test_add_repository(
@pytest.fixture @pytest.fixture
def copy_flux(repository: FSRepository) -> Flux: def copy_flux(repository: FSDataRepository) -> Flux:
raw_username = [repository.table("test-raw-username")] raw_username = [repository.table("test-raw-username")]
bronze_username = [repository.table("test-bronze-username")] bronze_username = [repository.table("test-bronze-username")]
@ -62,7 +62,7 @@ def copy_flux(repository: FSRepository) -> Flux:
@pytest.fixture @pytest.fixture
def foo_flux(repository: FSRepository) -> Flux: def foo_flux(repository: FSDataRepository) -> Flux:
src = [ src = [
repository.table("test-raw-username"), repository.table("test-raw-username"),
repository.table("test-raw-recovery"), repository.table("test-raw-recovery"),
@ -84,7 +84,7 @@ def foo_flux(repository: FSRepository) -> Flux:
return flux return flux
def test_add_flux(repository: FSRepository, copy_flux: Flux): def test_add_flux(repository: FSDataRepository, copy_flux: Flux):
dataplatform = DataPlateform() dataplatform = DataPlateform()
dataplatform.add_repository(repository) dataplatform.add_repository(repository)
@ -99,7 +99,7 @@ def test_add_flux(repository: FSRepository, copy_flux: Flux):
@pytest.fixture @pytest.fixture
def dataplatform( def dataplatform(
repository: FSRepository, repository: FSDataRepository,
foo_flux: Flux, foo_flux: Flux,
copy_flux: Flux, copy_flux: Flux,
) -> DataPlateform: ) -> DataPlateform:

View File

@ -3,7 +3,7 @@ from pathlib import Path
import pytest import pytest
from plesna.storage.repository.fs_repository import FSRepository from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/") FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@ -20,7 +20,7 @@ def location(tmp_path):
def test_init(location): def test_init(location):
repo = FSRepository("example", "example", location) repo = FSDataRepository("example", "example", location)
assert repo.ls() == [ assert repo.ls() == [
"schema", "schema",
] ]
@ -44,8 +44,8 @@ def test_init(location):
@pytest.fixture @pytest.fixture
def repository(location) -> FSRepository: def repository(location) -> FSDataRepository:
return FSRepository("repo_id", "example", location) return FSDataRepository("repo_id", "example", location)
def test_list_schemas(repository): def test_list_schemas(repository):

View File

@ -0,0 +1,54 @@
from datetime import datetime
import shutil
from pathlib import Path
import pytest
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
@pytest.fixture
def location(tmp_path):
catalogpath = tmp_path / "catalog"
catalogpath.mkdir()
return catalogpath
def test_init(location):
repo = FSMetaDataRepository(location)
@pytest.fixture
def metadata_repository(location) -> FSMetaDataRepository:
return FSMetaDataRepository(location)
def test_add_flux(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
assert metadata_filepath.exists()
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,output\n"
def test_register_flux_execution(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
metadata={
"datetime": datetime(2023, 3, 15, 14, 30),
"output": {"data": {"truc": "machin"}},
},
)
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == 'datetime,output\n1678887000.0,"{""data"":{""truc"":""machin""}}"\n'