Compare commits

11 Commits

12 changed files with 454 additions and 78 deletions

View File

@@ -5,7 +5,7 @@ from plesna.graph.graph_set import GraphSet
from plesna.models.flux import Flux, FluxMetaData
from plesna.models.graphs import Node
from plesna.models.libs.flux_graph import flux_to_edgeonset
from plesna.storage.repository.repository import Repository
from plesna.storage.data_repository.data_repository import DataRepository
class DataPlateformError(Exception):
@@ -18,7 +18,7 @@ class DataPlateform:
self._fluxes = {}
self._repositories = {}
def add_repository(self, repository: Repository) -> str:
def add_repository(self, repository: DataRepository) -> str:
if repository.id in self._repositories:
raise DataPlateformError("The repository {repository.id} already exists")
@@ -29,27 +29,31 @@ class DataPlateform:
def repositories(self) -> list[str]:
return list(self._repositories)
def repository(self, id: str) -> Repository:
def repository(self, id: str) -> DataRepository:
return self._repositories[id]
def add_flux(self, name: str, flux: Flux) -> str:
if name in self._fluxes:
raise DataPlateformError("The flux {name} already exists")
def is_valid_flux(self, flux: Flux) -> bool:
return True
self._fluxes[name] = flux
return name
def add_flux(self, flux: Flux) -> str:
if flux.id in self._fluxes:
raise DataPlateformError("The flux {flux} already exists")
assert self.is_valid_flux(flux)
self._fluxes[flux.id] = flux
return flux.id
@property
def fluxes(self) -> list[str]:
return list(self._fluxes)
def flux(self, name: str) -> Flux:
return self._fluxes[name]
def flux(self, flux_id: str) -> Flux:
return self._fluxes[flux_id]
def execute_flux(self, name: str) -> FluxMetaData:
if name not in self._fluxes:
raise DataPlateformError("The flux {name} is not registered")
return consume_flux(self._fluxes[name])
def execute_flux(self, flux_id: str) -> FluxMetaData:
if flux_id not in self._fluxes:
raise DataPlateformError("The flux {flux_id} is not registered")
return consume_flux(self._fluxes[flux_id])
def graphset(
self,

View File

@@ -3,7 +3,7 @@ import abc
from plesna.models.storage import Partition, Schema, Table
class Repository:
class DataRepository:
def __init__(self, id: str, name: str):
self._id = id
self._name = name

View File

@@ -3,8 +3,8 @@ from pathlib import Path
from pydantic import BaseModel, computed_field
from plesna.libs.string_tools import extract_values_from_pattern
from plesna.models.storage import Partition, Schema, Table
from plesna.storage.repository.repository import Repository
from plesna.models.storage import Schema, Table
from plesna.storage.data_repository.data_repository import DataRepository
class FSTable(BaseModel):
@@ -58,8 +58,8 @@ class FSRepositoryError(ValueError):
pass
class FSRepository(Repository):
"""Repository based on files tree structure
class FSDataRepository(DataRepository):
"""Data Repository based on files tree structure
- first level: schemas
- second level: tables

View File

@@ -0,0 +1,132 @@
from pathlib import Path
from datetime import datetime
import csv
import json
from typing import Iterable
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
from plesna.storage.metadata_repository.metadata_repository import (
ExecutionLog,
MetaDataRepository,
ModificationLog,
)
class FSMetaDataRepositoryError(ValueError):
pass
class FSMetaDataRepository(MetaDataRepository):
"""MetaData Repository based on csv files
Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL
"""
OBJECTS = {
"flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog},
"table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog},
}
def __init__(self, basepath: str):
super().__init__()
self._basepath = Path(basepath)
assert self._basepath.exists()
def get_things(self, what: str) -> list[str]:
"""List all ids for 'what'"""
whats = []
for filepath in self._basepath.iterdir():
try:
founded = extract_values_from_pattern(
self.OBJECTS[what]["filemodel"], filepath.name
)
except StringToolsError:
pass
else:
whats.append(founded["id"])
return whats
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
return self.get_things(what="flux")
def tables(
self,
) -> list[str]:
"""List all table's ids"""
return self.get_things(what="table")
def _add_thing(self, what: str, id: str) -> str:
"""Add the new things 'what'"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
filepath.touch()
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
)
writer.writeheader()
return id
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
return self._add_thing(what="flux", id=flux_id)
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
return self._add_thing(what="table", id=table_id)
def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog:
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event)
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
)
writer.writerow(metadata_.to_flat_dict())
return metadata_
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
"""Get the flux metadata"""
return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}})
def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str:
"""Get the table metadata"""
return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
def _get_all_log(self, what: str, id: str) -> Iterable[dict]:
"""Generate log dict from history"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
with open(filepath, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
"""Get all flux logs"""
logs = []
for logline in self._get_all_log("flux", flux_id):
logline["output"] = json.loads(logline["output"])
logs.append(self.OBJECTS["flux"]["logmodel"](**logline))
return logs
def flux(self, flux_id: str) -> ExecutionLog:
"""Get the last flux log"""
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
return [ModificationLog(**log) for log in self._get_all_log("table", table_id)]
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
return max(self.table_logs(table_id), key=lambda l: l.datetime)

View File

@@ -0,0 +1,81 @@
import abc
from datetime import datetime
from pydantic import BaseModel
from plesna.models.flux import FluxMetaData
class ModificationLog(BaseModel):
datetime: datetime
flux_id: str
def to_flat_dict(self):
return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id}
class ExecutionLog(BaseModel):
datetime: datetime
output: FluxMetaData
def to_flat_dict(self):
return {"datetime": self.datetime.isoformat(), "output": self.output.model_dump_json()}
class MetaDataRepository:
"""Object that stores metadata about flux, schema, tables"""
def __init__(self):
pass
@abc.abstractmethod
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_flux_execution(self, flux_id: str, dt: datetime, metadata: dict) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux(self, schema_id: str) -> ExecutionLog:
"""Get the flux last execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux_logs(self, schema_id: str) -> list[ExecutionLog]:
"""Get all the flux execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def tables(
self,
) -> list[str]:
"""List all table's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_table_modification(self, table_id: str, dt: datetime, metadata: dict) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
raise NotImplementedError
@abc.abstractmethod
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
raise NotImplementedError

16
pyproject.toml Normal file
View File

@@ -0,0 +1,16 @@
[project]
name = "plesna"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"ruff>=0.8.5",
]
[tool.ruff]
line-length = 100
indent-width = 4
[tool.ruff.lint]
select = ["E", "F"]
ignore = ["F401"]

View File

@@ -6,13 +6,13 @@ import pytest
from plesna.dataplatform import DataPlateform
from plesna.models.graphs import Edge, EdgeOnSet, Node
from plesna.models.flux import Flux, Transformation
from plesna.storage.repository.fs_repository import FSRepository
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@pytest.fixture
def repository(tmp_path) -> FSRepository:
def repository(tmp_path) -> FSDataRepository:
example_src = FIXTURE_DIR
assert example_src.exists()
@@ -24,11 +24,11 @@ def repository(tmp_path) -> FSRepository:
silver_path = Path(tmp_path) / "silver"
silver_path.mkdir()
return FSRepository("test", "test", tmp_path)
return FSDataRepository("test", "test", tmp_path)
def test_add_repository(
repository: FSRepository,
repository: FSDataRepository,
):
dp = DataPlateform()
dp.add_repository(repository)
@@ -39,7 +39,7 @@ def test_add_repository(
@pytest.fixture
def copy_flux(repository: FSRepository) -> Flux:
def copy_flux(repository: FSDataRepository) -> Flux:
raw_username = [repository.table("test-raw-username")]
bronze_username = [repository.table("test-bronze-username")]
@@ -62,7 +62,7 @@ def copy_flux(repository: FSRepository) -> Flux:
@pytest.fixture
def foo_flux(repository: FSRepository) -> Flux:
def foo_flux(repository: FSDataRepository) -> Flux:
src = [
repository.table("test-raw-username"),
repository.table("test-raw-recovery"),
@@ -84,22 +84,22 @@ def foo_flux(repository: FSRepository) -> Flux:
return flux
def test_add_flux(repository: FSRepository, copy_flux: Flux):
def test_add_flux(repository: FSDataRepository, copy_flux: Flux, foo_flux: Flux):
dataplatform = DataPlateform()
dataplatform.add_repository(repository)
dataplatform.add_flux(name="copy_flux", flux=copy_flux)
dataplatform.add_flux(flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"]
dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"]
dataplatform.add_flux(flux=foo_flux)
assert dataplatform.fluxes == ["copy_flux", "foo_flux"]
assert dataplatform.flux("copy_flux") == copy_flux
assert dataplatform.flux("copy_flux_bis") == copy_flux
assert dataplatform.flux("foo_flux") == foo_flux
@pytest.fixture
def dataplatform(
repository: FSRepository,
repository: FSDataRepository,
foo_flux: Flux,
copy_flux: Flux,
) -> DataPlateform:
@@ -107,8 +107,8 @@ def dataplatform(
dp.add_repository(repository)
dp.add_flux("foo", foo_flux)
dp.add_flux("raw_brz_copy_username", copy_flux)
dp.add_flux(foo_flux)
dp.add_flux(copy_flux)
return dp
@@ -269,12 +269,12 @@ def test_content_from_graph_arguments(dataplatform: DataPlateform):
def test_execute_flux(dataplatform: DataPlateform):
meta = dataplatform.execute_flux("foo")
meta = dataplatform.execute_flux("foo_flux")
assert meta.data == {"who": "foo"}
assert dataplatform.repository("test").schema("test-bronze").tables == []
meta = dataplatform.execute_flux("raw_brz_copy_username")
meta = dataplatform.execute_flux("copy_flux")
assert meta.data == {"src_size": 283, "tgt_size": 283}
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]

View File

@@ -1,39 +0,0 @@
from pathlib import Path
import pytest
from plesna.dataplatform import DataPlateform
from plesna.datastore.fs_datacatalogue import FSDataCatalogue
FIXTURE_DIR = Path(__file__).parent / Path("raw_data")
@pytest.fixture
def raw_catalogue(tmp_path):
raw_path = Path(tmp_path) / "raw"
return FSDataCatalogue(raw_path)
@pytest.fixture
def bronze_catalogue(tmp_path):
bronze_path = Path(tmp_path) / "bronze"
return FSDataCatalogue(bronze_path)
@pytest.fixture
def silver_catalogue(tmp_path):
silver_path = Path(tmp_path) / "silver"
return FSDataCatalogue(silver_path)
@pytest.fixture
def dataplateform(
raw_catalogue: FSDataCatalogue,
bronze_catalogue: FSDataCatalogue,
silver_catalogue: FSDataCatalogue,
):
dp = DataPlateform()
dp.add_datacatalague("raw", raw_catalogue)
dp.add_datacatalague("bronze", bronze_catalogue)
dp.add_datacatalague("silver", silver_catalogue)
pass

View File

@@ -3,7 +3,7 @@ from pathlib import Path
import pytest
from plesna.storage.repository.fs_repository import FSRepository
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@@ -20,7 +20,7 @@ def location(tmp_path):
def test_init(location):
repo = FSRepository("example", "example", location)
repo = FSDataRepository("example", "example", location)
assert repo.ls() == [
"schema",
]
@@ -44,8 +44,8 @@ def test_init(location):
@pytest.fixture
def repository(location) -> FSRepository:
return FSRepository("repo_id", "example", location)
def repository(location) -> FSDataRepository:
return FSDataRepository("repo_id", "example", location)
def test_list_schemas(repository):

View File

@@ -0,0 +1,182 @@
from datetime import datetime
import shutil
from pathlib import Path
import pytest
from plesna.models.flux import FluxMetaData
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
@pytest.fixture
def location(tmp_path):
catalogpath = tmp_path / "catalog"
catalogpath.mkdir()
return catalogpath
def test_init(location):
repo = FSMetaDataRepository(location)
@pytest.fixture
def metadata_repository(location) -> FSMetaDataRepository:
return FSMetaDataRepository(location)
def test_add_flux(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
id=flux_id
)
assert metadata_filepath.exists()
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,output\n"
def test_add_and_list_fluxes(metadata_repository):
flux_ids = ["my_flux", "flux2", "blahblah"]
for f in flux_ids:
metadata_repository.add_flux(f)
assert metadata_repository.fluxes() == flux_ids
def test_register_flux_execution(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={
"truc": "machin",
},
)
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
id=flux_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert (
content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
)
def test_register_and_get_exec_logs(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux_logs(flux_id)
assert logs == [
ExecutionLog(
datetime=datetime(2023, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "machin"}),
),
ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
),
]
def test_register_and_get_last_exec_log(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux(flux_id)
assert logs == ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
)
def test_add_and_list_tables(metadata_repository):
table_ids = ["my_table", "table2", "blahblah"]
for f in table_ids:
metadata_repository.add_table(f)
assert metadata_repository.tables() == table_ids
def test_register_table_modification(location, metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
id=table_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
def test_register_and_get_mod_logs(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table_logs(table_id)
assert logs == [
ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
]
def test_register_and_get_last_log(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table(table_id)
assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)