Compare commits

15 Commits

17 changed files with 644 additions and 102 deletions

View File

@@ -1,9 +1,11 @@
from collections.abc import Callable
from plesna.compute.consume_flux import consume_flux from plesna.compute.consume_flux import consume_flux
from plesna.graph.graph import Graph, Node from plesna.graph.graph import Graph
from plesna.graph.graph_set import EdgeOnSet, GraphSet from plesna.graph.graph_set import GraphSet
from plesna.models.flux import Flux, FluxMetaData from plesna.models.flux import Flux, FluxMetaData
from plesna.models.graphs import Node
from plesna.models.libs.flux_graph import flux_to_edgeonset from plesna.models.libs.flux_graph import flux_to_edgeonset
from plesna.storage.repository.repository import Repository from plesna.storage.data_repository.data_repository import DataRepository
class DataPlateformError(Exception): class DataPlateformError(Exception):
@@ -16,7 +18,7 @@ class DataPlateform:
self._fluxes = {} self._fluxes = {}
self._repositories = {} self._repositories = {}
def add_repository(self, repository: Repository) -> str: def add_repository(self, repository: DataRepository) -> str:
if repository.id in self._repositories: if repository.id in self._repositories:
raise DataPlateformError("The repository {repository.id} already exists") raise DataPlateformError("The repository {repository.id} already exists")
@@ -27,42 +29,65 @@ class DataPlateform:
def repositories(self) -> list[str]: def repositories(self) -> list[str]:
return list(self._repositories) return list(self._repositories)
def repository(self, id: str) -> Repository: def repository(self, id: str) -> DataRepository:
return self._repositories[id] return self._repositories[id]
def add_flux(self, name: str, flux: Flux) -> str: def is_valid_flux(self, flux: Flux) -> bool:
if name in self._fluxes: return True
raise DataPlateformError("The flux {name} already exists")
self._fluxes[name] = flux def add_flux(self, flux: Flux) -> str:
return name if flux.id in self._fluxes:
raise DataPlateformError("The flux {flux} already exists")
assert self.is_valid_flux(flux)
self._fluxes[flux.id] = flux
return flux.id
@property @property
def fluxes(self) -> list[str]: def fluxes(self) -> list[str]:
return list(self._fluxes) return list(self._fluxes)
def flux(self, name: str) -> Flux: def flux(self, flux_id: str) -> Flux:
return self._fluxes[name] return self._fluxes[flux_id]
def execute_flux(self, name: str) -> FluxMetaData: def execute_flux(self, flux_id: str) -> FluxMetaData:
if name not in self._fluxes: if flux_id not in self._fluxes:
raise DataPlateformError("The flux {name} is not registered") raise DataPlateformError("The flux {flux_id} is not registered")
return consume_flux(self._fluxes[name]) return consume_flux(self._fluxes[flux_id])
@property def graphset(
def graphset(self) -> GraphSet: self,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> GraphSet:
graphset = GraphSet() graphset = GraphSet()
for flux in self._fluxes.values(): for flux in self._fluxes.values():
edge = flux_to_edgeonset(flux) edge = flux_to_edgeonset(flux, name_flux, meta_flux, name_table, meta_table)
graphset.append(edge) graphset.append(edge)
return graphset return graphset
@property def graph(
def graph(self) -> Graph: self,
graph = Graph() name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> Graph:
"""Get the graph of fluxes and tables
:param name_flux: function on flux to name the edge
:param meta_flux: function on flux to attribute metadata to edge
:param name_table: function on table to name nodes
:param meta_table: function on flux to attribute metadata to nodes
"""
graph = self.graphset(name_flux, meta_flux, name_table, meta_table).to_graph()
for repo in self._repositories.values(): for repo in self._repositories.values():
for schema in repo.schemas(): for schema in repo.schemas():
for table in repo.tables(schema): for table in repo.tables(schema):
graph.add_node(Node(name=table)) t = repo.table(table)
graph.add_node(Node(name=name_table(t), metadata=meta_table(t)))
return graph return graph

View File

@@ -1,3 +1,4 @@
from typing import Set
from pydantic import BaseModel from pydantic import BaseModel
from functools import reduce from functools import reduce
from plesna.models.graphs import Node, Edge from plesna.models.graphs import Node, Edge
@@ -5,8 +6,8 @@ from plesna.models.graphs import Node, Edge
class Graph: class Graph:
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []): def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
self._edges = [] self._edges: list[Edge] = []
self._nodes = set() self._nodes: Set[Node] = set()
self.add_edges(edges) self.add_edges(edges)
self.add_nodes(nodes) self.add_nodes(nodes)

View File

@@ -14,6 +14,10 @@ class GraphSet:
self._node_sets.add(frozenset(edge.sources)) self._node_sets.add(frozenset(edge.sources))
self._node_sets.add(frozenset(edge.targets)) self._node_sets.add(frozenset(edge.targets))
@property
def edges(self) -> Set[EdgeOnSet]:
return self._edges
@property @property
def node_sets(self) -> Set[frozenset]: def node_sets(self) -> Set[frozenset]:
return self._node_sets return self._node_sets
@@ -24,7 +28,7 @@ class GraphSet:
graph.add_nodes(node_set) graph.add_nodes(node_set)
for edge in self._edges: for edge in self._edges:
flatten_edge = [ flatten_edge = [
Edge(arrow=edge.arrow, source=s, target=t, edge_kwrds=edge.edge_kwrds) Edge(arrow=edge.arrow, source=s, target=t, metadata=edge.metadata)
for (s, t) in product(edge.sources, edge.targets) for (s, t) in product(edge.sources, edge.targets)
] ]
graph.add_edges(flatten_edge) graph.add_edges(flatten_edge)

View File

@@ -3,6 +3,7 @@ from pydantic import BaseModel
class Node(BaseModel): class Node(BaseModel):
name: str name: str
metadata: dict = {}
def __hash__(self): def __hash__(self):
return hash(self.name) return hash(self.name)
@@ -12,11 +13,11 @@ class Edge(BaseModel):
arrow: str arrow: str
source: Node source: Node
target: Node target: Node
edge_kwrds: dict = {} metadata: dict = {}
class EdgeOnSet(BaseModel): class EdgeOnSet(BaseModel):
arrow: str arrow: str
sources: list[Node] sources: list[Node]
targets: list[Node] targets: list[Node]
edge_kwrds: dict = {} metadata: dict = {}

View File

@@ -0,0 +1,29 @@
from collections.abc import Callable
from plesna.models.flux import Flux
from plesna.models.graphs import EdgeOnSet, Node
def flux_to_edgeonset(
flux: Flux,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> EdgeOnSet:
"""Convert a flux to an EdgeOnSet
:param flux: the flux
:name_flux: function on flux which returns the name of the arrow from flux
:meta_flux: function on flux which returns a dict to store in metadata field
:name_table: function on table which returns the name of node
:meta_table: function on table which returns metadata of node
"""
sources = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.sources]
targets = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.targets]
return EdgeOnSet(
arrow=name_flux(flux),
sources=sources,
targets=targets,
metadata=meta_flux(flux),
)

View File

@@ -36,8 +36,9 @@ class Table(BaseModel):
schema_id: str schema_id: str
name: str name: str
value: str value: str
partitions: list[str] = []
datas: list[str] datas: list[str]
partitions: list[str] = []
metadata: dict = {}
class Partition(BaseModel): class Partition(BaseModel):

View File

@@ -3,7 +3,7 @@ import abc
from plesna.models.storage import Partition, Schema, Table from plesna.models.storage import Partition, Schema, Table
class Repository: class DataRepository:
def __init__(self, id: str, name: str): def __init__(self, id: str, name: str):
self._id = id self._id = id
self._name = name self._name = name

View File

@@ -3,8 +3,8 @@ from pathlib import Path
from pydantic import BaseModel, computed_field from pydantic import BaseModel, computed_field
from plesna.libs.string_tools import extract_values_from_pattern from plesna.libs.string_tools import extract_values_from_pattern
from plesna.models.storage import Partition, Schema, Table from plesna.models.storage import Schema, Table
from plesna.storage.repository.repository import Repository from plesna.storage.data_repository.data_repository import DataRepository
class FSTable(BaseModel): class FSTable(BaseModel):
@@ -58,8 +58,8 @@ class FSRepositoryError(ValueError):
pass pass
class FSRepository(Repository): class FSDataRepository(DataRepository):
"""Repository based on files tree structure """Data Repository based on files tree structure
- first level: schemas - first level: schemas
- second level: tables - second level: tables

View File

@@ -0,0 +1,132 @@
from pathlib import Path
from datetime import datetime
import csv
import json
from typing import Iterable
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
from plesna.storage.metadata_repository.metadata_repository import (
ExecutionLog,
MetaDataRepository,
ModificationLog,
)
class FSMetaDataRepositoryError(ValueError):
pass
class FSMetaDataRepository(MetaDataRepository):
"""MetaData Repository based on csv files
Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL
"""
OBJECTS = {
"flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog},
"table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog},
}
def __init__(self, basepath: str):
super().__init__()
self._basepath = Path(basepath)
assert self._basepath.exists()
def get_things(self, what: str) -> list[str]:
"""List all ids for 'what'"""
whats = []
for filepath in self._basepath.iterdir():
try:
founded = extract_values_from_pattern(
self.OBJECTS[what]["filemodel"], filepath.name
)
except StringToolsError:
pass
else:
whats.append(founded["id"])
return whats
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
return self.get_things(what="flux")
def tables(
self,
) -> list[str]:
"""List all table's ids"""
return self.get_things(what="table")
def _add_thing(self, what: str, id: str) -> str:
"""Add the new things 'what'"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
filepath.touch()
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
)
writer.writeheader()
return id
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
return self._add_thing(what="flux", id=flux_id)
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
return self._add_thing(what="table", id=table_id)
def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog:
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event)
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
)
writer.writerow(metadata_.to_flat_dict())
return metadata_
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
"""Get the flux metadata"""
return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}})
def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str:
"""Get the table metadata"""
return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
def _get_all_log(self, what: str, id: str) -> Iterable[dict]:
"""Generate log dict from history"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
with open(filepath, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
"""Get all flux logs"""
logs = []
for logline in self._get_all_log("flux", flux_id):
logline["output"] = json.loads(logline["output"])
logs.append(self.OBJECTS["flux"]["logmodel"](**logline))
return logs
def flux(self, flux_id: str) -> ExecutionLog:
"""Get the last flux log"""
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
return [ModificationLog(**log) for log in self._get_all_log("table", table_id)]
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
return max(self.table_logs(table_id), key=lambda l: l.datetime)

View File

@@ -0,0 +1,81 @@
import abc
from datetime import datetime
from pydantic import BaseModel
from plesna.models.flux import FluxMetaData
class ModificationLog(BaseModel):
datetime: datetime
flux_id: str
def to_flat_dict(self):
return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id}
class ExecutionLog(BaseModel):
datetime: datetime
output: FluxMetaData
def to_flat_dict(self):
return {"datetime": self.datetime.isoformat(), "output": self.output.model_dump_json()}
class MetaDataRepository:
"""Object that stores metadata about flux, schema, tables"""
def __init__(self):
pass
@abc.abstractmethod
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_flux_execution(self, flux_id: str, dt: datetime, metadata: dict) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux(self, schema_id: str) -> ExecutionLog:
"""Get the flux last execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux_logs(self, schema_id: str) -> list[ExecutionLog]:
"""Get all the flux execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def tables(
self,
) -> list[str]:
"""List all table's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_table_modification(self, table_id: str, dt: datetime, metadata: dict) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
raise NotImplementedError
@abc.abstractmethod
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
raise NotImplementedError

16
pyproject.toml Normal file
View File

@@ -0,0 +1,16 @@
[project]
name = "plesna"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"ruff>=0.8.5",
]
[tool.ruff]
line-length = 100
indent-width = 4
[tool.ruff.lint]
select = ["E", "F"]
ignore = ["F401"]

View File

@@ -4,15 +4,15 @@ from pathlib import Path
import pytest import pytest
from plesna.dataplatform import DataPlateform from plesna.dataplatform import DataPlateform
from plesna.models.graphs import Node from plesna.models.graphs import Edge, EdgeOnSet, Node
from plesna.models.flux import Flux, Transformation from plesna.models.flux import Flux, Transformation
from plesna.storage.repository.fs_repository import FSRepository from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@pytest.fixture @pytest.fixture
def repository(tmp_path) -> FSRepository: def repository(tmp_path) -> FSDataRepository:
example_src = FIXTURE_DIR example_src = FIXTURE_DIR
assert example_src.exists() assert example_src.exists()
@@ -24,11 +24,11 @@ def repository(tmp_path) -> FSRepository:
silver_path = Path(tmp_path) / "silver" silver_path = Path(tmp_path) / "silver"
silver_path.mkdir() silver_path.mkdir()
return FSRepository("test", "test", tmp_path) return FSDataRepository("test", "test", tmp_path)
def test_add_repository( def test_add_repository(
repository: FSRepository, repository: FSDataRepository,
): ):
dp = DataPlateform() dp = DataPlateform()
dp.add_repository(repository) dp.add_repository(repository)
@@ -39,7 +39,7 @@ def test_add_repository(
@pytest.fixture @pytest.fixture
def copy_flux(repository: FSRepository) -> Flux: def copy_flux(repository: FSDataRepository) -> Flux:
raw_username = [repository.table("test-raw-username")] raw_username = [repository.table("test-raw-username")]
bronze_username = [repository.table("test-bronze-username")] bronze_username = [repository.table("test-bronze-username")]
@@ -62,7 +62,7 @@ def copy_flux(repository: FSRepository) -> Flux:
@pytest.fixture @pytest.fixture
def foo_flux(repository: FSRepository) -> Flux: def foo_flux(repository: FSDataRepository) -> Flux:
src = [ src = [
repository.table("test-raw-username"), repository.table("test-raw-username"),
repository.table("test-raw-recovery"), repository.table("test-raw-recovery"),
@@ -84,22 +84,22 @@ def foo_flux(repository: FSRepository) -> Flux:
return flux return flux
def test_add_flux(repository: FSRepository, copy_flux: Flux): def test_add_flux(repository: FSDataRepository, copy_flux: Flux, foo_flux: Flux):
dataplatform = DataPlateform() dataplatform = DataPlateform()
dataplatform.add_repository(repository) dataplatform.add_repository(repository)
dataplatform.add_flux(name="copy_flux", flux=copy_flux) dataplatform.add_flux(flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"] assert dataplatform.fluxes == ["copy_flux"]
dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux) dataplatform.add_flux(flux=foo_flux)
assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"] assert dataplatform.fluxes == ["copy_flux", "foo_flux"]
assert dataplatform.flux("copy_flux") == copy_flux assert dataplatform.flux("copy_flux") == copy_flux
assert dataplatform.flux("copy_flux_bis") == copy_flux assert dataplatform.flux("foo_flux") == foo_flux
@pytest.fixture @pytest.fixture
def dataplatform( def dataplatform(
repository: FSRepository, repository: FSDataRepository,
foo_flux: Flux, foo_flux: Flux,
copy_flux: Flux, copy_flux: Flux,
) -> DataPlateform: ) -> DataPlateform:
@@ -107,8 +107,8 @@ def dataplatform(
dp.add_repository(repository) dp.add_repository(repository)
dp.add_flux("foo", foo_flux) dp.add_flux(foo_flux)
dp.add_flux("raw_brz_copy_username", copy_flux) dp.add_flux(copy_flux)
return dp return dp
@@ -127,14 +127,8 @@ def test_listing_content(dataplatform: DataPlateform):
] ]
def test_content_from_graph(dataplatform: DataPlateform): def test_content_from_graphset(dataplatform: DataPlateform):
assert dataplatform.graph.nodes == { assert dataplatform.graphset().node_sets == {
Node(name="test-raw-recovery", infos={}),
Node(name="test-raw-salary", infos={}),
Node(name="test-raw-username", infos={}),
}
assert dataplatform.graphset.node_sets == {
frozenset( frozenset(
{ {
Node(name="test-bronze-username"), Node(name="test-bronze-username"),
@@ -157,15 +151,130 @@ def test_content_from_graph(dataplatform: DataPlateform):
} }
), ),
} }
assert dataplatform.graphset().edges == [
EdgeOnSet(
arrow="foo_flux",
sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")],
targets=[Node(name="test-bronze-foo")],
metadata={},
),
EdgeOnSet(
arrow="copy_flux",
sources=[Node(name="test-raw-username")],
targets=[Node(name="test-bronze-username")],
metadata={},
),
]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph().nodes == {
Node(name="test-raw-recovery", metadata={}),
Node(name="test-raw-salary", metadata={}),
Node(name="test-raw-username", metadata={}),
Node(name="test-bronze-username", metadata={}),
Node(name="test-bronze-foo", metadata={}),
Node(name="test-raw-username", metadata={}),
}
assert dataplatform.graph().edges == [
Edge(
arrow="foo_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-foo"),
metadata={},
),
Edge(
arrow="foo_flux",
source=Node(name="test-raw-recovery"),
target=Node(name="test-bronze-foo"),
metadata={},
),
Edge(
arrow="copy_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-username"),
metadata={},
),
]
def test_content_from_graph_arguments(dataplatform: DataPlateform):
name_flux = lambda flux: f"flux-{flux.id}"
meta_flux = lambda flux: {"name": flux.name}
meta_table = lambda table: {"id": table.id, "partitions": table.partitions}
assert dataplatform.graph(
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
).nodes == {
Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}),
Node(
name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]}
),
Node(
name="test-raw-recovery",
metadata={
"id": "test-raw-recovery",
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
},
),
Node(
name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []}
),
Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
}
assert dataplatform.graph(
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
).edges == [
Edge(
arrow="flux-foo_flux",
source=Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
target=Node(
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
),
metadata={"name": "foo"},
),
Edge(
arrow="flux-foo_flux",
source=Node(
name="test-raw-recovery",
metadata={
"id": "test-raw-recovery",
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
},
),
target=Node(
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
),
metadata={"name": "foo"},
),
Edge(
arrow="flux-copy_flux",
source=Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
target=Node(
name="test-bronze-username",
metadata={"id": "test-bronze-username", "partitions": []},
),
metadata={"name": "copy"},
),
]
def test_execute_flux(dataplatform: DataPlateform): def test_execute_flux(dataplatform: DataPlateform):
meta = dataplatform.execute_flux("foo") meta = dataplatform.execute_flux("foo_flux")
assert meta.data == {"who": "foo"} assert meta.data == {"who": "foo"}
assert dataplatform.repository("test").schema("test-bronze").tables == [] assert dataplatform.repository("test").schema("test-bronze").tables == []
meta = dataplatform.execute_flux("raw_brz_copy_username") meta = dataplatform.execute_flux("copy_flux")
assert meta.data == {"src_size": 283, "tgt_size": 283} assert meta.data == {"src_size": 283, "tgt_size": 283}
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"] assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]

View File

@@ -1,39 +0,0 @@
from pathlib import Path
import pytest
from plesna.dataplatform import DataPlateform
from plesna.datastore.fs_datacatalogue import FSDataCatalogue
FIXTURE_DIR = Path(__file__).parent / Path("raw_data")
@pytest.fixture
def raw_catalogue(tmp_path):
raw_path = Path(tmp_path) / "raw"
return FSDataCatalogue(raw_path)
@pytest.fixture
def bronze_catalogue(tmp_path):
bronze_path = Path(tmp_path) / "bronze"
return FSDataCatalogue(bronze_path)
@pytest.fixture
def silver_catalogue(tmp_path):
silver_path = Path(tmp_path) / "silver"
return FSDataCatalogue(silver_path)
@pytest.fixture
def dataplateform(
raw_catalogue: FSDataCatalogue,
bronze_catalogue: FSDataCatalogue,
silver_catalogue: FSDataCatalogue,
):
dp = DataPlateform()
dp.add_datacatalague("raw", raw_catalogue)
dp.add_datacatalague("bronze", bronze_catalogue)
dp.add_datacatalague("silver", silver_catalogue)
pass

View File

@@ -3,7 +3,7 @@ from pathlib import Path
import pytest import pytest
from plesna.storage.repository.fs_repository import FSRepository from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/") FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@@ -20,7 +20,7 @@ def location(tmp_path):
def test_init(location): def test_init(location):
repo = FSRepository("example", "example", location) repo = FSDataRepository("example", "example", location)
assert repo.ls() == [ assert repo.ls() == [
"schema", "schema",
] ]
@@ -44,8 +44,8 @@ def test_init(location):
@pytest.fixture @pytest.fixture
def repository(location) -> FSRepository: def repository(location) -> FSDataRepository:
return FSRepository("repo_id", "example", location) return FSDataRepository("repo_id", "example", location)
def test_list_schemas(repository): def test_list_schemas(repository):

View File

@@ -0,0 +1,182 @@
from datetime import datetime
import shutil
from pathlib import Path
import pytest
from plesna.models.flux import FluxMetaData
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
@pytest.fixture
def location(tmp_path):
catalogpath = tmp_path / "catalog"
catalogpath.mkdir()
return catalogpath
def test_init(location):
repo = FSMetaDataRepository(location)
@pytest.fixture
def metadata_repository(location) -> FSMetaDataRepository:
return FSMetaDataRepository(location)
def test_add_flux(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
id=flux_id
)
assert metadata_filepath.exists()
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,output\n"
def test_add_and_list_fluxes(metadata_repository):
flux_ids = ["my_flux", "flux2", "blahblah"]
for f in flux_ids:
metadata_repository.add_flux(f)
assert metadata_repository.fluxes() == flux_ids
def test_register_flux_execution(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={
"truc": "machin",
},
)
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
id=flux_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert (
content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
)
def test_register_and_get_exec_logs(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux_logs(flux_id)
assert logs == [
ExecutionLog(
datetime=datetime(2023, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "machin"}),
),
ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
),
]
def test_register_and_get_last_exec_log(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux(flux_id)
assert logs == ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
)
def test_add_and_list_tables(metadata_repository):
table_ids = ["my_table", "table2", "blahblah"]
for f in table_ids:
metadata_repository.add_table(f)
assert metadata_repository.tables() == table_ids
def test_register_table_modification(location, metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
id=table_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
def test_register_and_get_mod_logs(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table_logs(table_id)
assert logs == [
ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
]
def test_register_and_get_last_log(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table(table_id)
assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)