Compare commits
2 Commits
bb691acc14
...
543b3fe98e
Author | SHA1 | Date | |
---|---|---|---|
543b3fe98e | |||
1a49158afa |
@ -5,7 +5,7 @@ from plesna.graph.graph_set import GraphSet
|
|||||||
from plesna.models.flux import Flux, FluxMetaData
|
from plesna.models.flux import Flux, FluxMetaData
|
||||||
from plesna.models.graphs import Node
|
from plesna.models.graphs import Node
|
||||||
from plesna.models.libs.flux_graph import flux_to_edgeonset
|
from plesna.models.libs.flux_graph import flux_to_edgeonset
|
||||||
from plesna.storage.repository.repository import Repository
|
from plesna.storage.data_repository.data_repository import DataRepository
|
||||||
|
|
||||||
|
|
||||||
class DataPlateformError(Exception):
|
class DataPlateformError(Exception):
|
||||||
@ -18,7 +18,7 @@ class DataPlateform:
|
|||||||
self._fluxes = {}
|
self._fluxes = {}
|
||||||
self._repositories = {}
|
self._repositories = {}
|
||||||
|
|
||||||
def add_repository(self, repository: Repository) -> str:
|
def add_repository(self, repository: DataRepository) -> str:
|
||||||
if repository.id in self._repositories:
|
if repository.id in self._repositories:
|
||||||
raise DataPlateformError("The repository {repository.id} already exists")
|
raise DataPlateformError("The repository {repository.id} already exists")
|
||||||
|
|
||||||
@ -29,7 +29,7 @@ class DataPlateform:
|
|||||||
def repositories(self) -> list[str]:
|
def repositories(self) -> list[str]:
|
||||||
return list(self._repositories)
|
return list(self._repositories)
|
||||||
|
|
||||||
def repository(self, id: str) -> Repository:
|
def repository(self, id: str) -> DataRepository:
|
||||||
return self._repositories[id]
|
return self._repositories[id]
|
||||||
|
|
||||||
def add_flux(self, name: str, flux: Flux) -> str:
|
def add_flux(self, name: str, flux: Flux) -> str:
|
||||||
|
@ -3,7 +3,7 @@ import abc
|
|||||||
from plesna.models.storage import Partition, Schema, Table
|
from plesna.models.storage import Partition, Schema, Table
|
||||||
|
|
||||||
|
|
||||||
class Repository:
|
class DataRepository:
|
||||||
def __init__(self, id: str, name: str):
|
def __init__(self, id: str, name: str):
|
||||||
self._id = id
|
self._id = id
|
||||||
self._name = name
|
self._name = name
|
@ -3,8 +3,8 @@ from pathlib import Path
|
|||||||
from pydantic import BaseModel, computed_field
|
from pydantic import BaseModel, computed_field
|
||||||
|
|
||||||
from plesna.libs.string_tools import extract_values_from_pattern
|
from plesna.libs.string_tools import extract_values_from_pattern
|
||||||
from plesna.models.storage import Partition, Schema, Table
|
from plesna.models.storage import Schema, Table
|
||||||
from plesna.storage.repository.repository import Repository
|
from plesna.storage.data_repository.data_repository import DataRepository
|
||||||
|
|
||||||
|
|
||||||
class FSTable(BaseModel):
|
class FSTable(BaseModel):
|
||||||
@ -58,8 +58,8 @@ class FSRepositoryError(ValueError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class FSRepository(Repository):
|
class FSDataRepository(DataRepository):
|
||||||
"""Repository based on files tree structure
|
"""Data Repository based on files tree structure
|
||||||
|
|
||||||
- first level: schemas
|
- first level: schemas
|
||||||
- second level: tables
|
- second level: tables
|
0
plesna/storage/metadata_repository/__init__.py
Normal file
0
plesna/storage/metadata_repository/__init__.py
Normal file
79
plesna/storage/metadata_repository/fs_metadata_repository.py
Normal file
79
plesna/storage/metadata_repository/fs_metadata_repository.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
import csv
|
||||||
|
|
||||||
|
from plesna.storage.metadata_repository.metadata_repository import (
|
||||||
|
ExecutionMetaData,
|
||||||
|
MetaDataRepository,
|
||||||
|
ModificationMetaData,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class FSMetaDataRepositoryError(ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FSMetaDataRepository(MetaDataRepository):
|
||||||
|
"""MetaData Repository based on csv files
|
||||||
|
|
||||||
|
Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
EXECUTION_FILEMODEL = "{flux_id}_execution.csv"
|
||||||
|
MODIFICATION_FILEMODEL = "{table_id}_modification.csv"
|
||||||
|
|
||||||
|
def __init__(self, basepath: str):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
self._basepath = Path(basepath)
|
||||||
|
assert self._basepath.exists()
|
||||||
|
|
||||||
|
def fluxes(self) -> list[str]:
|
||||||
|
"""List fluxes's ids"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def add_flux(self, flux_id: str) -> str:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||||
|
filepath.touch()
|
||||||
|
with open(filepath, "a") as csvfile:
|
||||||
|
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
|
||||||
|
writer.writeheader()
|
||||||
|
return flux_id
|
||||||
|
|
||||||
|
def register_flux_execution(self, flux_id: str, metadata: dict) -> ExecutionMetaData:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||||
|
metadata_ = ExecutionMetaData(**metadata)
|
||||||
|
if not filepath.exists:
|
||||||
|
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
||||||
|
|
||||||
|
with open(filepath, "a") as csvfile:
|
||||||
|
writer = csv.DictWriter(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
|
||||||
|
writer.writerow(metadata_.to_flat_dict())
|
||||||
|
|
||||||
|
return metadata_
|
||||||
|
|
||||||
|
def flux(self, flux_id: str) -> ExecutionMetaData:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def flux_logs(self, flux_id: str) -> list[ExecutionMetaData]:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
filepath = self._basepath / self.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||||
|
if not filepath.exists:
|
||||||
|
raise FSMetaDataRepositoryError(f"The flux {flux_id} hasn't been added yet.")
|
||||||
|
with open(filepath, "r") as csvfile:
|
||||||
|
logs = []
|
||||||
|
reader = csv.DictReader(csvfile, fieldnames=ExecutionMetaData.__fields__.keys())
|
||||||
|
for row in reader:
|
||||||
|
logs.append(ExecutionMetaData(**row))
|
||||||
|
return logs
|
||||||
|
|
||||||
|
def tables(self) -> list[str]:
|
||||||
|
"""List table's name in schema (the id)"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def table(self, table_id: str) -> ModificationMetaData:
|
||||||
|
"""Get table's metadatas"""
|
||||||
|
raise NotImplementedError
|
78
plesna/storage/metadata_repository/metadata_repository.py
Normal file
78
plesna/storage/metadata_repository/metadata_repository.py
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
import abc
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from plesna.models.flux import FluxMetaData
|
||||||
|
|
||||||
|
|
||||||
|
class ModificationMetaData(BaseModel):
|
||||||
|
datetime: datetime
|
||||||
|
flux_id: str
|
||||||
|
|
||||||
|
|
||||||
|
class ExecutionMetaData(BaseModel):
|
||||||
|
datetime: datetime
|
||||||
|
output: FluxMetaData
|
||||||
|
|
||||||
|
def to_flat_dict(self):
|
||||||
|
return {"datetime": self.datetime.timestamp(), "output": self.output.model_dump_json()}
|
||||||
|
|
||||||
|
|
||||||
|
class MetaDataRepository:
|
||||||
|
"""Object that stores metadata about flux, schema, tables"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def fluxes(self) -> list[str]:
|
||||||
|
"""List fluxes's ids"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def add_flux(self, flux_id: str) -> str:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def register_flux_execution(self, flux_id: str, metadata: ExecutionMetaData) -> str:
|
||||||
|
"""Get the flux metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def flux(self, schema_id: str) -> ExecutionMetaData:
|
||||||
|
"""Get the flux last execution metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def flux_logs(self, schema_id: str) -> list[ExecutionMetaData]:
|
||||||
|
"""Get all the flux execution metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def tables(
|
||||||
|
self,
|
||||||
|
) -> list[str]:
|
||||||
|
"""List all table's ids"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def add_table(self, table_id: str) -> str:
|
||||||
|
"""Get the table metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def register_table_modification(self, table_id: str, metadata: ModificationMetaData) -> str:
|
||||||
|
"""Get the table metadata"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def table(self, table_id: str) -> ModificationMetaData:
|
||||||
|
"""Get the last table's modification metadatas"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def table_logs(self, table_id: str) -> list[ModificationMetaData]:
|
||||||
|
"""Get all table's modification metadatas"""
|
||||||
|
raise NotImplementedError
|
@ -6,13 +6,13 @@ import pytest
|
|||||||
from plesna.dataplatform import DataPlateform
|
from plesna.dataplatform import DataPlateform
|
||||||
from plesna.models.graphs import Edge, EdgeOnSet, Node
|
from plesna.models.graphs import Edge, EdgeOnSet, Node
|
||||||
from plesna.models.flux import Flux, Transformation
|
from plesna.models.flux import Flux, Transformation
|
||||||
from plesna.storage.repository.fs_repository import FSRepository
|
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
|
||||||
|
|
||||||
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
|
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def repository(tmp_path) -> FSRepository:
|
def repository(tmp_path) -> FSDataRepository:
|
||||||
example_src = FIXTURE_DIR
|
example_src = FIXTURE_DIR
|
||||||
assert example_src.exists()
|
assert example_src.exists()
|
||||||
|
|
||||||
@ -24,11 +24,11 @@ def repository(tmp_path) -> FSRepository:
|
|||||||
silver_path = Path(tmp_path) / "silver"
|
silver_path = Path(tmp_path) / "silver"
|
||||||
silver_path.mkdir()
|
silver_path.mkdir()
|
||||||
|
|
||||||
return FSRepository("test", "test", tmp_path)
|
return FSDataRepository("test", "test", tmp_path)
|
||||||
|
|
||||||
|
|
||||||
def test_add_repository(
|
def test_add_repository(
|
||||||
repository: FSRepository,
|
repository: FSDataRepository,
|
||||||
):
|
):
|
||||||
dp = DataPlateform()
|
dp = DataPlateform()
|
||||||
dp.add_repository(repository)
|
dp.add_repository(repository)
|
||||||
@ -39,7 +39,7 @@ def test_add_repository(
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def copy_flux(repository: FSRepository) -> Flux:
|
def copy_flux(repository: FSDataRepository) -> Flux:
|
||||||
raw_username = [repository.table("test-raw-username")]
|
raw_username = [repository.table("test-raw-username")]
|
||||||
bronze_username = [repository.table("test-bronze-username")]
|
bronze_username = [repository.table("test-bronze-username")]
|
||||||
|
|
||||||
@ -62,7 +62,7 @@ def copy_flux(repository: FSRepository) -> Flux:
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def foo_flux(repository: FSRepository) -> Flux:
|
def foo_flux(repository: FSDataRepository) -> Flux:
|
||||||
src = [
|
src = [
|
||||||
repository.table("test-raw-username"),
|
repository.table("test-raw-username"),
|
||||||
repository.table("test-raw-recovery"),
|
repository.table("test-raw-recovery"),
|
||||||
@ -84,7 +84,7 @@ def foo_flux(repository: FSRepository) -> Flux:
|
|||||||
return flux
|
return flux
|
||||||
|
|
||||||
|
|
||||||
def test_add_flux(repository: FSRepository, copy_flux: Flux):
|
def test_add_flux(repository: FSDataRepository, copy_flux: Flux):
|
||||||
dataplatform = DataPlateform()
|
dataplatform = DataPlateform()
|
||||||
dataplatform.add_repository(repository)
|
dataplatform.add_repository(repository)
|
||||||
|
|
||||||
@ -99,7 +99,7 @@ def test_add_flux(repository: FSRepository, copy_flux: Flux):
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def dataplatform(
|
def dataplatform(
|
||||||
repository: FSRepository,
|
repository: FSDataRepository,
|
||||||
foo_flux: Flux,
|
foo_flux: Flux,
|
||||||
copy_flux: Flux,
|
copy_flux: Flux,
|
||||||
) -> DataPlateform:
|
) -> DataPlateform:
|
||||||
|
@ -3,7 +3,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from plesna.storage.repository.fs_repository import FSRepository
|
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
|
||||||
|
|
||||||
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
|
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
|
||||||
|
|
||||||
@ -20,7 +20,7 @@ def location(tmp_path):
|
|||||||
|
|
||||||
|
|
||||||
def test_init(location):
|
def test_init(location):
|
||||||
repo = FSRepository("example", "example", location)
|
repo = FSDataRepository("example", "example", location)
|
||||||
assert repo.ls() == [
|
assert repo.ls() == [
|
||||||
"schema",
|
"schema",
|
||||||
]
|
]
|
||||||
@ -44,8 +44,8 @@ def test_init(location):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def repository(location) -> FSRepository:
|
def repository(location) -> FSDataRepository:
|
||||||
return FSRepository("repo_id", "example", location)
|
return FSDataRepository("repo_id", "example", location)
|
||||||
|
|
||||||
|
|
||||||
def test_list_schemas(repository):
|
def test_list_schemas(repository):
|
54
tests/storage/test_fs_metadata_repository.py
Normal file
54
tests/storage/test_fs_metadata_repository.py
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def location(tmp_path):
|
||||||
|
catalogpath = tmp_path / "catalog"
|
||||||
|
catalogpath.mkdir()
|
||||||
|
|
||||||
|
return catalogpath
|
||||||
|
|
||||||
|
|
||||||
|
def test_init(location):
|
||||||
|
repo = FSMetaDataRepository(location)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def metadata_repository(location) -> FSMetaDataRepository:
|
||||||
|
return FSMetaDataRepository(location)
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_flux(location, metadata_repository):
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_flux(flux_id)
|
||||||
|
|
||||||
|
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||||
|
assert metadata_filepath.exists()
|
||||||
|
|
||||||
|
with open(metadata_filepath, "r") as csvfile:
|
||||||
|
content = csvfile.read()
|
||||||
|
assert content == "datetime,output\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_register_flux_execution(location, metadata_repository):
|
||||||
|
flux_id = "my_flux"
|
||||||
|
metadata_repository.add_flux(flux_id)
|
||||||
|
|
||||||
|
metadata_repository.register_flux_execution(
|
||||||
|
flux_id,
|
||||||
|
metadata={
|
||||||
|
"datetime": datetime(2023, 3, 15, 14, 30),
|
||||||
|
"output": {"data": {"truc": "machin"}},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata_filepath = location / metadata_repository.EXECUTION_FILEMODEL.format(flux_id=flux_id)
|
||||||
|
with open(metadata_filepath, "r") as csvfile:
|
||||||
|
content = csvfile.read()
|
||||||
|
assert content == 'datetime,output\n1678887000.0,"{""data"":{""truc"":""machin""}}"\n'
|
Loading…
Reference in New Issue
Block a user