Compare commits

37 Commits

Author SHA1 Message Date
ec19534094 chore: add pyproject 2025-03-02 18:06:43 +01:00
d4428187d1 Feat: add tests 2025-03-02 18:04:50 +01:00
9118feb4c6 refact: use flux_id instead of name 2025-01-19 14:59:47 +01:00
d7716a4b8e Feat: add table logs retreiving 2025-01-19 06:47:16 +01:00
478a8c2403 Feat: register table modifications 2025-01-18 07:31:30 +01:00
8882317a47 Feat: add listing fluxes 2025-01-17 06:29:54 +01:00
2a387a1bc8 Feat: retrieve last log 2025-01-15 18:12:11 +01:00
eec3a13dbb refact: rename metadata to log 2025-01-15 18:01:30 +01:00
8623cd5960 Feat: read and write flux logs 2025-01-15 17:48:44 +01:00
543b3fe98e Feat: start working on metadata_repository 2025-01-15 06:56:46 +01:00
1a49158afa refact: move repository to data_repository 2025-01-14 07:00:19 +01:00
bb691acc14 refact: rename parameters in converting to graph function 2025-01-11 06:35:40 +01:00
90472ac868 Feat: use fonction to build graphs 2025-01-06 07:12:08 +01:00
0ae6439217 refact: replace edge_kwrds with metadata in models 2025-01-05 18:40:19 +01:00
2f170d91b6 test: add test on graph for dataplatform 2025-01-05 18:27:26 +01:00
5ebde14be9 Feat: add to_graph and is_valid_dag for graph_set 2025-01-05 16:42:57 +01:00
44a7eed5b4 feat: build dataplatform graph and graphset dynamicaly 2025-01-05 16:25:22 +01:00
f2ed76c8aa feat: add node in graphset when add flux 2025-01-05 15:55:50 +01:00
041e459ca0 refact: move id and name to flux 2025-01-05 15:50:51 +01:00
e4af62b136 refact: move Transformation to flux model 2025-01-05 15:43:29 +01:00
9a5c581f31 refact: move Node, Edge and EdgeOnSet to models 2025-01-05 15:37:56 +01:00
09783f9c1e Feat: flux takes list of tables for sources and targets 2025-01-05 15:31:40 +01:00
8a43a93cda refact: repo id are not based on path but on id 2025-01-05 15:13:38 +01:00
ae61fd3c12 refact: use repository id in dataplatform 2025-01-05 14:55:46 +01:00
d256fbf169 Fix: repo id use tests 2025-01-05 14:34:16 +01:00
48964ad561 Feat: use id in repository 2025-01-05 11:27:52 +01:00
b9dade2701 Feat: add extract_values_from_pattern 2025-01-05 10:46:30 +01:00
ed8f91d78b Feat: add tables method to fs_repository 2025-01-05 07:01:03 +01:00
d1c1b7420d refact: replace callback with str for arrow in graph_set 2025-01-05 06:51:14 +01:00
f0315d09b9 refact: reorganize raw_datas and adapt tests 2025-01-05 06:42:51 +01:00
86f0dcc49e Feat: execute flux on dataplatform 2025-01-04 21:33:05 +01:00
d04bfe1d44 Feat: add execute_flux 2025-01-04 15:30:32 +01:00
1446c166ca Feat: add flux in dataplatform 2025-01-04 13:51:24 +01:00
beb9fd5465 Feat: add repository to dataplatform 2025-01-03 16:01:01 +01:00
78d6ac12bf Fix: remove recursive schemas for fs repository 2025-01-03 16:00:40 +01:00
350c03dbfe Fix: adapt to new Table form 2025-01-03 15:56:29 +01:00
e28ab332a7 feat: move fs_datacatalogue to fs_repository 2025-01-03 15:54:18 +01:00
42 changed files with 1372 additions and 428 deletions

View File

@@ -3,6 +3,6 @@ from plesna.models.flux import Flux, FluxMetaData
def consume_flux(flux: Flux) -> FluxMetaData: def consume_flux(flux: Flux) -> FluxMetaData:
metadata = flux.transformation.function( metadata = flux.transformation.function(
sources=flux.sources, targets=flux.targets, **flux.transformation.extra_kwrds sources=flux.sources_dict, targets=flux.targets_dict, **flux.transformation.extra_kwrds
) )
return FluxMetaData(data=metadata) return FluxMetaData(data=metadata)

View File

@@ -1,5 +1,11 @@
from plesna.datastore.datacatalogue import DataCatalogue from collections.abc import Callable
from plesna.compute.consume_flux import consume_flux
from plesna.graph.graph import Graph
from plesna.graph.graph_set import GraphSet from plesna.graph.graph_set import GraphSet
from plesna.models.flux import Flux, FluxMetaData
from plesna.models.graphs import Node
from plesna.models.libs.flux_graph import flux_to_edgeonset
from plesna.storage.data_repository.data_repository import DataRepository
class DataPlateformError(Exception): class DataPlateformError(Exception):
@@ -8,20 +14,80 @@ class DataPlateformError(Exception):
class DataPlateform: class DataPlateform:
def __init__(self): def __init__(self):
self._graphset = GraphSet()
self._metadata_engine = "" self._metadata_engine = ""
self._transformations = {} self._fluxes = {}
self._datacatalogues = {} self._repositories = {}
def add_datacatalague(self, name: str, datacatalogue: DataCatalogue): def add_repository(self, repository: DataRepository) -> str:
if name in self._datacatalogues: if repository.id in self._repositories:
raise DataPlateformError("The datacatalogue {name} already exists") raise DataPlateformError("The repository {repository.id} already exists")
self._datacatalogues[name] = datacatalogue self._repositories[repository.id] = repository
return repository.id
@property @property
def datacatalogues(self): def repositories(self) -> list[str]:
return list(self._datacatalogues) return list(self._repositories)
def get_datacatalogue(self, name: str): def repository(self, id: str) -> DataRepository:
return self._datacatalogues[name] return self._repositories[id]
def is_valid_flux(self, flux: Flux) -> bool:
return True
def add_flux(self, flux: Flux) -> str:
if flux.id in self._fluxes:
raise DataPlateformError("The flux {flux} already exists")
assert self.is_valid_flux(flux)
self._fluxes[flux.id] = flux
return flux.id
@property
def fluxes(self) -> list[str]:
return list(self._fluxes)
def flux(self, flux_id: str) -> Flux:
return self._fluxes[flux_id]
def execute_flux(self, flux_id: str) -> FluxMetaData:
if flux_id not in self._fluxes:
raise DataPlateformError("The flux {flux_id} is not registered")
return consume_flux(self._fluxes[flux_id])
def graphset(
self,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> GraphSet:
graphset = GraphSet()
for flux in self._fluxes.values():
edge = flux_to_edgeonset(flux, name_flux, meta_flux, name_table, meta_table)
graphset.append(edge)
return graphset
def graph(
self,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> Graph:
"""Get the graph of fluxes and tables
:param name_flux: function on flux to name the edge
:param meta_flux: function on flux to attribute metadata to edge
:param name_table: function on table to name nodes
:param meta_table: function on flux to attribute metadata to nodes
"""
graph = self.graphset(name_flux, meta_flux, name_table, meta_table).to_graph()
for repo in self._repositories.values():
for schema in repo.schemas():
for table in repo.tables(schema):
t = repo.table(table)
graph.add_node(Node(name=name_table(t), metadata=meta_table(t)))
return graph

View File

@@ -1,3 +0,0 @@
class DataStore:
def __init__(self, name):
self._name

View File

@@ -1,81 +0,0 @@
from pathlib import Path
from pydantic import BaseModel, computed_field
from plesna.models.storage import Schema, Table
from .datacatalogue import DataCatalogue
class FakeSchema(BaseModel):
name: str
@computed_field
@property
def ref(self) -> Schema:
return Schema(
id=str(self.name),
value=str(self.name),
)
class FakeTable(BaseModel):
name: str
data: dict[str, list]
@computed_field
@property
def ref(self) -> Table:
return Table(
id=str(self.name),
value=str(self.name),
)
class FakeDataCatalogue(DataCatalogue):
"""DataCatalogue based on dictionnaries"""
def __init__(self, name: str):
self.name = name
def ls(
self, dir="", only_files=False, only_directories=False, recursive=False
) -> list[str]:
dirpath = self._basepath / dir
if only_files:
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if not f.is_dir() and not str(f).startswith(".")
]
if only_directories:
if recursive:
return [
str(f[0].relative_to(dirpath))
for f in dirpath.walk()
if not str(f).startswith(".")
]
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if f.is_dir() and not str(f).startswith(".")
]
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if not str(f).startswith(".")
]
def schemas(self) -> dict[str, FSSchema]:
"""List schemas (sub directories within basepath)"""
subdirectories = self.ls("", only_directories=True, recursive=True)
return {str(path): FSSchema(path=path) for path in subdirectories}
def tables(self, schema_id=".") -> dict[str, FSTable]:
"""List table in schema (which are files in the directory)"""
schema_path = schema_id
return {path: FSTable(path=path) for path in self.ls(schema_path, only_files=True)}

View File

@@ -1,91 +0,0 @@
from pathlib import Path
from pydantic import BaseModel, computed_field
from plesna.models.storage import Schema, Table
from .datacatalogue import DataCatalogue
class FSTable(BaseModel):
path: Path
@computed_field
@property
def ref(self) -> Table:
return Table(
id=str(self.path),
value=str(self.path),
)
class FSSchema(BaseModel):
path: Path
tables: list[str]
@computed_field
@property
def ref(self) -> Schema:
return Schema(
id=str(self.path),
value=str(self.path),
)
class FSDataCatalogue(DataCatalogue):
"""DataCatalogue based on files tree structure"""
def __init__(self, name: str, basepath: str = "."):
self._basepath = Path(basepath)
self.name = name
assert self._basepath.exists()
def ls(
self, dir="", only_files=False, only_directories=False, recursive=False
) -> list[str]:
dirpath = self._basepath / dir
if only_files:
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if not f.is_dir() and not str(f).startswith(".")
]
if only_directories:
if recursive:
return [
str(f[0].relative_to(dirpath))
for f in dirpath.walk()
if not str(f).startswith(".")
]
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if f.is_dir() and not str(f).startswith(".")
]
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if not str(f).startswith(".")
]
@property
def schemas(self) -> list[str]:
"""List schemas (sub directories within basepath)"""
subdirectories = self.ls("", only_directories=True, recursive=True)
return [str(d) for d in subdirectories]
def schema(self, schema: str) -> FSSchema:
"""List schemas (sub directories within basepath)"""
tables = self.ls(schema, only_files=True)
return FSSchema(path=Path(schema), tables=tables)
def table(self, schema: str, table:str) -> FSTable:
"""List table in schema (which are files in the directory)"""
schema_path = schema_id
return {path: FSTable(path=path) for path in self.ls(schema_path, only_files=True)}

View File

@@ -1,28 +1,13 @@
from functools import reduce from typing import Set
from typing import Callable
from pydantic import BaseModel from pydantic import BaseModel
from functools import reduce
from plesna.models.graphs import Node, Edge
class Node(BaseModel):
name: str
infos: dict = {}
def __hash__(self):
return hash(self.name)
class Edge(BaseModel):
arrow_name: str
source: Node
target: Node
edge_kwrds: dict = {}
class Graph: class Graph:
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []): def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
self._edges = [] self._edges: list[Edge] = []
self._nodes = set() self._nodes: Set[Node] = set()
self.add_edges(edges) self.add_edges(edges)
self.add_nodes(nodes) self.add_nodes(nodes)

View File

@@ -1,21 +1,7 @@
from typing import Callable from typing import Set
from plesna.graph.graph import Graph
from pydantic import BaseModel from plesna.models.graphs import Edge, EdgeOnSet
from itertools import product
class Node(BaseModel):
name: str
infos: dict = {}
def __hash__(self):
return hash(self.name)
class EdgeOnSet(BaseModel):
arrow: Callable
sources: dict[str, Node]
targets: dict[str, Node]
edge_kwrds: dict = {}
class GraphSet: class GraphSet:
@@ -25,12 +11,29 @@ class GraphSet:
def append(self, edge: EdgeOnSet): def append(self, edge: EdgeOnSet):
self._edges.append(edge) self._edges.append(edge)
self._node_sets.add(frozenset(edge.sources.values())) self._node_sets.add(frozenset(edge.sources))
self._node_sets.add(frozenset(edge.targets.values())) self._node_sets.add(frozenset(edge.targets))
@property @property
def node_sets(self): def edges(self) -> Set[EdgeOnSet]:
return self._edges
@property
def node_sets(self) -> Set[frozenset]:
return self._node_sets return self._node_sets
def is_valid_dag(self): def to_graph(self) -> Graph:
pass graph = Graph()
for node_set in self.node_sets:
graph.add_nodes(node_set)
for edge in self._edges:
flatten_edge = [
Edge(arrow=edge.arrow, source=s, target=t, metadata=edge.metadata)
for (s, t) in product(edge.sources, edge.targets)
]
graph.add_edges(flatten_edge)
return graph
def is_valid_dag(self) -> bool:
return self.to_graph().is_dag()

View File

@@ -0,0 +1,18 @@
import re
class StringToolsError(ValueError):
pass
def extract_values_from_pattern(pattern, string):
regex = re.sub(r"{(.+?)}", r"(?P<_\1>.+)", pattern)
search = re.search(regex, string)
if search:
values = list(search.groups())
keys = re.findall(r"{(.+?)}", pattern)
_dict = dict(zip(keys, values))
return _dict
raise StringToolsError(f"Can't parse '{string}' with the pattern '{pattern}'")

View File

@@ -1,14 +1,48 @@
from pydantic import BaseModel from collections.abc import Callable
from pydantic import BaseModel, computed_field
from plesna.models.storage import Table from plesna.models.storage import Table
from plesna.models.transformation import Transformation
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
function: Callable
extra_kwrds: dict = {}
class Flux(BaseModel): class Flux(BaseModel):
sources: dict[str, Table] id: str
targets: dict[str, Table] name: str
sources: list[Table]
targets: list[Table]
transformation: Transformation transformation: Transformation
@computed_field
@property
def sources_dict(self) -> dict[str, Table]:
return {s.id: s for s in self.sources}
@computed_field
@property
def sources_id(self) -> dict[str, Table]:
return [s.id for s in self.sources]
@computed_field
@property
def targets_id(self) -> dict[str, Table]:
return [s.id for s in self.targets]
@computed_field
@property
def targets_dict(self) -> dict[str, Table]:
return {s.id: s for s in self.targets}
class FluxMetaData(BaseModel): class FluxMetaData(BaseModel):
data: dict data: dict

23
plesna/models/graphs.py Normal file
View File

@@ -0,0 +1,23 @@
from pydantic import BaseModel
class Node(BaseModel):
name: str
metadata: dict = {}
def __hash__(self):
return hash(self.name)
class Edge(BaseModel):
arrow: str
source: Node
target: Node
metadata: dict = {}
class EdgeOnSet(BaseModel):
arrow: str
sources: list[Node]
targets: list[Node]
metadata: dict = {}

View File

@@ -0,0 +1,29 @@
from collections.abc import Callable
from plesna.models.flux import Flux
from plesna.models.graphs import EdgeOnSet, Node
def flux_to_edgeonset(
flux: Flux,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> EdgeOnSet:
"""Convert a flux to an EdgeOnSet
:param flux: the flux
:name_flux: function on flux which returns the name of the arrow from flux
:meta_flux: function on flux which returns a dict to store in metadata field
:name_table: function on table which returns the name of node
:meta_table: function on table which returns metadata of node
"""
sources = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.sources]
targets = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.targets]
return EdgeOnSet(
arrow=name_flux(flux),
sources=sources,
targets=targets,
metadata=meta_flux(flux),
)

View File

@@ -2,24 +2,60 @@ from pydantic import BaseModel
class Schema(BaseModel): class Schema(BaseModel):
"""Logical agregation for Table """Where multiple tables are stored
id: uniq identifier for the schema id: uniq identifier for the schema
value: string which describe where to find the schema in the storage system repo_id: id of the repo where the schema belong to
name: name of the schema
value: string which describe where to find the schema in the repository
""" """
id: str id: str
repo_id: str
name: str
value: str value: str
tables: list[str] = []
class Table(BaseModel): class Table(BaseModel):
"""Place where data are stored """Place where same structured data are stored
id: uniq identifier for the table id: uniq identifier for the table
repo_id: id of the repo where the table belong to
schema_id: id of the schema where table belong to
name: the name of the table
value: string which describe where to find the table in the storage system value: string which describe where to find the table in the storage system
partitions: list of partitions
datas: list of string to access data
""" """
id: str id: str
repo_id: str
schema_id: str
name: str
value: str
datas: list[str]
partitions: list[str] = []
metadata: dict = {}
class Partition(BaseModel):
"""Place where data are stored
id: uniq identifier for the table
repo_id: id of the repo where the table belong to
schema_id: id of the schema where table belong to
table_id: id of the schema where table belong to
name: the name of the partition
value: string which describe where to find the partition in the storage system
"""
id: str
repo_id: str
schema_id: str
table_id: str
name: str
value: str value: str

View File

@@ -1,15 +0,0 @@
from collections.abc import Callable
from pydantic import BaseModel
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
function: Callable
extra_kwrds: dict = {}

View File

@@ -0,0 +1,37 @@
import abc
from plesna.models.storage import Partition, Schema, Table
class DataRepository:
def __init__(self, id: str, name: str):
self._id = id
self._name = name
@property
def id(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@abc.abstractmethod
def schemas(self) -> list[str]:
"""List schema's ids"""
raise NotImplementedError
@abc.abstractmethod
def schema(self, schema_id: str) -> Schema:
"""Get the schema properties"""
raise NotImplementedError
@abc.abstractmethod
def tables(self, schema_id: str) -> list[str]:
"""List table's name in schema (the id)"""
raise NotImplementedError
@abc.abstractmethod
def table(self, table_id: str) -> Table:
"""Get the table properties (the id)"""
raise NotImplementedError

View File

@@ -0,0 +1,197 @@
from pathlib import Path
from pydantic import BaseModel, computed_field
from plesna.libs.string_tools import extract_values_from_pattern
from plesna.models.storage import Schema, Table
from plesna.storage.data_repository.data_repository import DataRepository
class FSTable(BaseModel):
name: str
repo_id: str
schema_id: str
id: str
path: Path
is_partitionned: bool
partitions: list[str] = []
@computed_field
@property
def ref(self) -> Table:
if self.is_partitionned:
datas = [str(self.path.absolute() / p) for p in self.partitions]
else:
datas = [str(self.path.absolute())]
return Table(
id=self.id,
repo_id=self.repo_id,
schema_id=self.schema_id,
name=self.name,
value=str(self.path.absolute()),
partitions=self.partitions,
datas=datas,
)
class FSSchema(BaseModel):
name: str
repo_id: str
id: str
path: Path
tables: list[str]
@computed_field
@property
def ref(self) -> Schema:
return Schema(
id=self.id,
repo_id=self.repo_id,
name=self.name,
value=str(self.path.absolute()),
tables=self.tables,
)
class FSRepositoryError(ValueError):
pass
class FSDataRepository(DataRepository):
"""Data Repository based on files tree structure
- first level: schemas
- second level: tables
- third level: partition (actual datas)
"""
ID_FMT = {
"schema": "{repo_id}-{schema_name}",
"table": "{schema_id}-{table_name}",
}
def __init__(self, id: str, name: str, basepath: str):
super().__init__(id, name)
self._basepath = Path(basepath)
assert self._basepath.exists()
def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]:
"""List files in dir
:param dir: relative path from self._basepath
:param only_files: if true return only files
:param only_directories: if true return only directories
:param recursive: list content recursively (only for)
:return: list of string describing path from self._basepath / dir
"""
dirpath = self._basepath / dir
if recursive:
paths = dirpath.rglob("*")
else:
paths = dirpath.iterdir()
if only_files:
return [
str(f.relative_to(dirpath))
for f in paths
if not f.is_dir() and not str(f).startswith(".")
]
if only_directories:
return [
str(f.relative_to(dirpath))
for f in paths
if f.is_dir() and not str(f).startswith(".")
]
return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")]
def parse_id(self, string: str, id_type: str) -> dict:
if id_type not in self.ID_FMT:
raise FSRepositoryError(
"Wrong id_type. Gots {id_type} needs to be one of {self.ID_FMT.values}"
)
parsed = extract_values_from_pattern(self.ID_FMT[id_type], string)
if not parsed:
raise FSRepositoryError(
f"Wrong format for {id_type}. Got {string} need {self.ID_FMT['id_type']}"
)
return parsed
def schemas(self) -> list[str]:
"""List schemas (sub directories within basepath)"""
subdirectories = self.ls("", only_directories=True)
return [
self.ID_FMT["schema"].format(repo_id=self.id, schema_name=d) for d in subdirectories
]
def _schema(self, schema_id: str) -> FSSchema:
"""List schemas (sub directories within basepath)"""
parsed = self.parse_id(schema_id, "schema")
repo_id = parsed["repo_id"]
schema_name = parsed["schema_name"]
schema_path = self._basepath / schema_name
if repo_id != self.id:
raise FSRepositoryError("Trying to get schema that don't belong in this repository")
tables = self.tables(schema_id)
return FSSchema(
name=schema_name,
id=schema_id,
repo_id=self.id,
schema_id=schema_id,
path=schema_path,
tables=tables,
)
def schema(self, schema_id: str) -> Schema:
return self._schema(schema_id).ref
def _tables(self, schema_id: str) -> list[str]:
parsed = self.parse_id(schema_id, "schema")
tables = self.ls(parsed["schema_name"])
return [self.ID_FMT["table"].format(table_name=t, schema_id=schema_id) for t in tables]
def tables(self, schema_id: str = "") -> list[str]:
if schema_id:
return self._tables(schema_id)
tables = []
for schema in self.schemas():
tables += self._tables(schema)
return tables
def _table(self, table_id: str) -> FSTable:
"""Get infos on the table"""
parsed = self.parse_id(table_id, "table")
schema = self._schema(parsed["schema_id"])
if not schema.path.exists():
raise FSRepositoryError(f"The schema {schema.id} does not exists.")
table_subpath = f"{schema.name}/{parsed['table_name']}"
table_path = self._basepath / table_subpath
is_partitionned = table_path.is_dir()
if is_partitionned:
partitions = self.ls(table_subpath, only_files=True)
else:
partitions = []
return FSTable(
name=parsed["table_name"],
id=table_id,
repo_id=self.id,
schema_id=schema.id,
path=table_path,
is_partitionned=is_partitionned,
partitions=partitions,
)
def table(self, table_id: str) -> Table:
return self._table(table_id).ref

View File

@@ -1,6 +1,6 @@
import abc import abc
from plesna.models.storage import Schema, Table from plesna.models.storage import Schema
class DataCatalogue: class DataCatalogue:
@@ -19,16 +19,6 @@ class DataCatalogue:
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def tables(self, schema:str) -> list[str]: def tables(self, schema: str) -> list[str]:
"""List table's name in schema""" """List table's name in schema"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod
def table(self, schema:str, table:str) -> Table:
"""Get the table properties"""
raise NotImplementedError
@abc.abstractmethod
def infos(self, table: str, schema: str) -> dict[str, str]:
"""Get infos about the table"""
raise NotImplementedError

View File

@@ -0,0 +1,132 @@
from pathlib import Path
from datetime import datetime
import csv
import json
from typing import Iterable
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
from plesna.storage.metadata_repository.metadata_repository import (
ExecutionLog,
MetaDataRepository,
ModificationLog,
)
class FSMetaDataRepositoryError(ValueError):
pass
class FSMetaDataRepository(MetaDataRepository):
"""MetaData Repository based on csv files
Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL
"""
OBJECTS = {
"flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog},
"table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog},
}
def __init__(self, basepath: str):
super().__init__()
self._basepath = Path(basepath)
assert self._basepath.exists()
def get_things(self, what: str) -> list[str]:
"""List all ids for 'what'"""
whats = []
for filepath in self._basepath.iterdir():
try:
founded = extract_values_from_pattern(
self.OBJECTS[what]["filemodel"], filepath.name
)
except StringToolsError:
pass
else:
whats.append(founded["id"])
return whats
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
return self.get_things(what="flux")
def tables(
self,
) -> list[str]:
"""List all table's ids"""
return self.get_things(what="table")
def _add_thing(self, what: str, id: str) -> str:
"""Add the new things 'what'"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
filepath.touch()
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
)
writer.writeheader()
return id
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
return self._add_thing(what="flux", id=flux_id)
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
return self._add_thing(what="table", id=table_id)
def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog:
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event)
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
)
writer.writerow(metadata_.to_flat_dict())
return metadata_
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
"""Get the flux metadata"""
return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}})
def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str:
"""Get the table metadata"""
return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
def _get_all_log(self, what: str, id: str) -> Iterable[dict]:
"""Generate log dict from history"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
with open(filepath, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
"""Get all flux logs"""
logs = []
for logline in self._get_all_log("flux", flux_id):
logline["output"] = json.loads(logline["output"])
logs.append(self.OBJECTS["flux"]["logmodel"](**logline))
return logs
def flux(self, flux_id: str) -> ExecutionLog:
"""Get the last flux log"""
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
return [ModificationLog(**log) for log in self._get_all_log("table", table_id)]
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
return max(self.table_logs(table_id), key=lambda l: l.datetime)

View File

@@ -0,0 +1,81 @@
import abc
from datetime import datetime
from pydantic import BaseModel
from plesna.models.flux import FluxMetaData
class ModificationLog(BaseModel):
datetime: datetime
flux_id: str
def to_flat_dict(self):
return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id}
class ExecutionLog(BaseModel):
datetime: datetime
output: FluxMetaData
def to_flat_dict(self):
return {"datetime": self.datetime.isoformat(), "output": self.output.model_dump_json()}
class MetaDataRepository:
"""Object that stores metadata about flux, schema, tables"""
def __init__(self):
pass
@abc.abstractmethod
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_flux_execution(self, flux_id: str, dt: datetime, metadata: dict) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux(self, schema_id: str) -> ExecutionLog:
"""Get the flux last execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux_logs(self, schema_id: str) -> list[ExecutionLog]:
"""Get all the flux execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def tables(
self,
) -> list[str]:
"""List all table's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_table_modification(self, table_id: str, dt: datetime, metadata: dict) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
raise NotImplementedError
@abc.abstractmethod
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
raise NotImplementedError

16
pyproject.toml Normal file
View File

@@ -0,0 +1,16 @@
[project]
name = "plesna"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"ruff>=0.8.5",
]
[tool.ruff]
line-length = 100
indent-width = 4
[tool.ruff.lint]
select = ["E", "F"]
ignore = ["F401"]

View File

@@ -1,18 +1,17 @@
from plesna.compute.consume_flux import consume_flux from plesna.compute.consume_flux import consume_flux
from plesna.models.flux import Flux from plesna.models.flux import Flux, Transformation
from plesna.models.storage import Table from plesna.models.storage import Table
from plesna.models.transformation import Transformation
def test_consume_flux(): def test_consume_flux():
sources = { sources = [
"src1": Table(id="src1", value="here"), Table(id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
"src2": Table(id="src2", value="here"), Table(id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
} ]
targets = { targets = [
"tgt1": Table(id="tgt1", value="this"), Table(id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]),
"tgt2": Table(id="tgt2", value="that"), Table(id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]),
} ]
def func(sources, targets, **kwrds): def func(sources, targets, **kwrds):
return { return {
@@ -22,6 +21,8 @@ def test_consume_flux():
} }
flux = Flux( flux = Flux(
id="flux",
name="flux",
sources=sources, sources=sources,
targets=targets, targets=targets,
transformation=Transformation(function=func, extra_kwrds={"extra": "super"}), transformation=Transformation(function=func, extra_kwrds={"extra": "super"}),

View File

@@ -1,43 +1,280 @@
import shutil
from pathlib import Path from pathlib import Path
import pytest import pytest
from plesna.dataplatform import DataPlateform from plesna.dataplatform import DataPlateform
from plesna.datastore.fs_datacatalogue import FSDataCatalogue from plesna.models.graphs import Edge, EdgeOnSet, Node
from plesna.models.flux import Flux, Transformation
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent / Path("raw_data") FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@pytest.fixture @pytest.fixture
def raw_catalogue(tmp_path): def repository(tmp_path) -> FSDataRepository:
example_src = FIXTURE_DIR
assert example_src.exists()
raw_path = Path(tmp_path) / "raw" raw_path = Path(tmp_path) / "raw"
raw_path.mkdir() shutil.copytree(src=example_src.absolute(), dst=raw_path.absolute())
return FSDataCatalogue("raw", raw_path)
@pytest.fixture
def bronze_catalogue(tmp_path):
bronze_path = Path(tmp_path) / "bronze" bronze_path = Path(tmp_path) / "bronze"
bronze_path.mkdir() bronze_path.mkdir()
return FSDataCatalogue("bronze", bronze_path) silver_path = Path(tmp_path) / "silver"
silver_path.mkdir()
return FSDataRepository("test", "test", tmp_path)
def test_add_repository(
repository: FSDataRepository,
):
dp = DataPlateform()
dp.add_repository(repository)
assert dp.repositories == ["test"]
assert dp.repository("test") == repository
@pytest.fixture @pytest.fixture
def silver_catalogue(tmp_path): def copy_flux(repository: FSDataRepository) -> Flux:
silver_path = Path(tmp_path) / "silver" raw_username = [repository.table("test-raw-username")]
silver_path.mkdir() bronze_username = [repository.table("test-bronze-username")]
return FSDataCatalogue("silver", silver_path)
def copy(sources, targets):
src_path = Path(sources["test-raw-username"].datas[0])
tgt_path = Path(targets["test-bronze-username"].datas[0])
shutil.copy(src_path, tgt_path)
return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size}
extra_kwrds = {}
raw_brz_copy_username = Flux(
id="copy_flux",
name="copy",
sources=raw_username,
targets=bronze_username,
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
)
return raw_brz_copy_username
def test_add_catalogue( @pytest.fixture
raw_catalogue: FSDataCatalogue, def foo_flux(repository: FSDataRepository) -> Flux:
bronze_catalogue: FSDataCatalogue, src = [
silver_catalogue: FSDataCatalogue, repository.table("test-raw-username"),
): repository.table("test-raw-recovery"),
]
targets = [repository.table("test-bronze-foo")]
def foo(sources, targets):
return {"who": "foo"}
extra_kwrds = {}
flux = Flux(
id="foo_flux",
name="foo",
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
)
return flux
def test_add_flux(repository: FSDataRepository, copy_flux: Flux, foo_flux: Flux):
dataplatform = DataPlateform()
dataplatform.add_repository(repository)
dataplatform.add_flux(flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"]
dataplatform.add_flux(flux=foo_flux)
assert dataplatform.fluxes == ["copy_flux", "foo_flux"]
assert dataplatform.flux("copy_flux") == copy_flux
assert dataplatform.flux("foo_flux") == foo_flux
@pytest.fixture
def dataplatform(
repository: FSDataRepository,
foo_flux: Flux,
copy_flux: Flux,
) -> DataPlateform:
dp = DataPlateform() dp = DataPlateform()
dp.add_datacatalague("raw", raw_catalogue)
dp.add_datacatalague("bronze", bronze_catalogue)
dp.add_datacatalague("silver", silver_catalogue)
assert dp.datacatalogues == ["raw", "bronze", "silver"] dp.add_repository(repository)
assert dp.get_datacatalogue("raw") == raw_catalogue
dp.add_flux(foo_flux)
dp.add_flux(copy_flux)
return dp
def test_listing_content(dataplatform: DataPlateform):
assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"]
assert dataplatform.repository("test").schema("test-raw").tables == [
"test-raw-username",
"test-raw-recovery",
"test-raw-salary",
]
assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"]
assert dataplatform.repository("test").table("test-raw-recovery").partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
def test_content_from_graphset(dataplatform: DataPlateform):
assert dataplatform.graphset().node_sets == {
frozenset(
{
Node(name="test-bronze-username"),
}
),
frozenset(
{
Node(name="test-bronze-foo"),
}
),
frozenset(
{
Node(name="test-raw-username"),
}
),
frozenset(
{
Node(name="test-raw-username"),
Node(name="test-raw-recovery"),
}
),
}
assert dataplatform.graphset().edges == [
EdgeOnSet(
arrow="foo_flux",
sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")],
targets=[Node(name="test-bronze-foo")],
metadata={},
),
EdgeOnSet(
arrow="copy_flux",
sources=[Node(name="test-raw-username")],
targets=[Node(name="test-bronze-username")],
metadata={},
),
]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph().nodes == {
Node(name="test-raw-recovery", metadata={}),
Node(name="test-raw-salary", metadata={}),
Node(name="test-raw-username", metadata={}),
Node(name="test-bronze-username", metadata={}),
Node(name="test-bronze-foo", metadata={}),
Node(name="test-raw-username", metadata={}),
}
assert dataplatform.graph().edges == [
Edge(
arrow="foo_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-foo"),
metadata={},
),
Edge(
arrow="foo_flux",
source=Node(name="test-raw-recovery"),
target=Node(name="test-bronze-foo"),
metadata={},
),
Edge(
arrow="copy_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-username"),
metadata={},
),
]
def test_content_from_graph_arguments(dataplatform: DataPlateform):
name_flux = lambda flux: f"flux-{flux.id}"
meta_flux = lambda flux: {"name": flux.name}
meta_table = lambda table: {"id": table.id, "partitions": table.partitions}
assert dataplatform.graph(
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
).nodes == {
Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}),
Node(
name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]}
),
Node(
name="test-raw-recovery",
metadata={
"id": "test-raw-recovery",
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
},
),
Node(
name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []}
),
Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
}
assert dataplatform.graph(
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
).edges == [
Edge(
arrow="flux-foo_flux",
source=Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
target=Node(
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
),
metadata={"name": "foo"},
),
Edge(
arrow="flux-foo_flux",
source=Node(
name="test-raw-recovery",
metadata={
"id": "test-raw-recovery",
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
},
),
target=Node(
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
),
metadata={"name": "foo"},
),
Edge(
arrow="flux-copy_flux",
source=Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
target=Node(
name="test-bronze-username",
metadata={"id": "test-bronze-username", "partitions": []},
),
metadata={"name": "copy"},
),
]
def test_execute_flux(dataplatform: DataPlateform):
meta = dataplatform.execute_flux("foo_flux")
assert meta.data == {"who": "foo"}
assert dataplatform.repository("test").schema("test-bronze").tables == []
meta = dataplatform.execute_flux("copy_flux")
assert meta.data == {"src_size": 283, "tgt_size": 283}
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]

View File

@@ -1,61 +0,0 @@
import shutil
from pathlib import Path
import pytest
from plesna.datastore.fs_datacatalogue import FSDataCatalogue
from plesna.models.storage import Schema
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@pytest.fixture
def location(tmp_path):
loc = tmp_path
username_loc = loc / "username"
username_loc.mkdir()
salary_loc = loc / "salary"
salary_loc.mkdir()
example_src = FIXTURE_DIR
assert example_src.exists()
for f in example_src.glob("*"):
if "username" in str(f):
shutil.copy(f, username_loc)
else:
shutil.copy(f, salary_loc)
return loc
def test_init(location):
repo = FSDataCatalogue("example", location)
assert repo.ls() == [
"username",
"salary",
]
assert repo.ls(recursive=True) == [
"username",
"salary",
]
def test_list_schema(location):
repo = FSDataCatalogue("example", location)
assert repo.schemas == [".", "username", "salary"]
assert repo.schema(".").ref == Schema(id=".", value=".")
assert repo.schema("username").ref == Schema(id="username", value="username")
def test_list_tables_schema(location):
repo = FSDataCatalogue("example", location)
assert repo.schema(".").tables == []
assert repo.schema("username").tables == [
'username.csv',
'username-password-recovery-code.xlsx',
'username-password-recovery-code.xls',
]
assert repo.schema("salary").tables == ["salary.pdf"]

View File

@@ -1,39 +0,0 @@
from pathlib import Path
import pytest
from plesna.dataplatform import DataPlateform
from plesna.datastore.fs_datacatalogue import FSDataCatalogue
FIXTURE_DIR = Path(__file__).parent / Path("raw_data")
@pytest.fixture
def raw_catalogue(tmp_path):
raw_path = Path(tmp_path) / "raw"
return FSDataCatalogue(raw_path)
@pytest.fixture
def bronze_catalogue(tmp_path):
bronze_path = Path(tmp_path) / "bronze"
return FSDataCatalogue(bronze_path)
@pytest.fixture
def silver_catalogue(tmp_path):
silver_path = Path(tmp_path) / "silver"
return FSDataCatalogue(silver_path)
@pytest.fixture
def dataplateform(
raw_catalogue: FSDataCatalogue,
bronze_catalogue: FSDataCatalogue,
silver_catalogue: FSDataCatalogue,
):
dp = DataPlateform()
dp.add_datacatalague("raw", raw_catalogue)
dp.add_datacatalague("bronze", bronze_catalogue)
dp.add_datacatalague("silver", silver_catalogue)
pass

View File

@@ -1,6 +1,7 @@
import pytest import pytest
from plesna.graph.graph import Edge, Graph, Node from plesna.graph.graph import Graph
from plesna.models.graphs import Edge, Node
def test_append_nodess(): def test_append_nodess():
@@ -19,8 +20,8 @@ def test_append_edges():
nodeB = Node(name="B") nodeB = Node(name="B")
nodeC = Node(name="C") nodeC = Node(name="C")
edge1 = Edge(arrow_name="arrow", source=nodeA, target=nodeC) edge1 = Edge(arrow="arrow", source=nodeA, target=nodeC)
edge2 = Edge(arrow_name="arrow", source=nodeB, target=nodeC) edge2 = Edge(arrow="arrow", source=nodeB, target=nodeC)
graph = Graph() graph = Graph()
graph.add_edge(edge1) graph.add_edge(edge1)
@@ -34,7 +35,7 @@ def test_init_edges_nodes():
nodeB = Node(name="B") nodeB = Node(name="B")
nodeC = Node(name="C") nodeC = Node(name="C")
edge1 = Edge(arrow_name="arrow", source=nodeB, target=nodeC) edge1 = Edge(arrow="arrow", source=nodeB, target=nodeC)
graph = Graph() graph = Graph()
graph.add_node(nodeA) graph.add_node(nodeA)
@@ -56,19 +57,19 @@ def nodes():
@pytest.fixture @pytest.fixture
def dag_edges(nodes): def dag_edges(nodes):
return { return {
"1": Edge(arrow_name="arrow", source=nodes["A"], target=nodes["C"]), "1": Edge(arrow="arrow", source=nodes["A"], target=nodes["C"]),
"2": Edge(arrow_name="arrow", source=nodes["B"], target=nodes["C"]), "2": Edge(arrow="arrow", source=nodes["B"], target=nodes["C"]),
"3": Edge(arrow_name="arrow", source=nodes["C"], target=nodes["D"]), "3": Edge(arrow="arrow", source=nodes["C"], target=nodes["D"]),
} }
@pytest.fixture @pytest.fixture
def notdag_edges(nodes): def notdag_edges(nodes):
return { return {
"1": Edge(arrow_name="arrow", source=nodes["A"], target=nodes["C"]), "1": Edge(arrow="arrow", source=nodes["A"], target=nodes["C"]),
"2": Edge(arrow_name="arrow", source=nodes["B"], target=nodes["C"]), "2": Edge(arrow="arrow", source=nodes["B"], target=nodes["C"]),
"3": Edge(arrow_name="arrow", source=nodes["C"], target=nodes["D"]), "3": Edge(arrow="arrow", source=nodes["C"], target=nodes["D"]),
"4": Edge(arrow_name="arrow", source=nodes["D"], target=nodes["B"]), "4": Edge(arrow="arrow", source=nodes["D"], target=nodes["B"]),
} }
@@ -94,9 +95,7 @@ def test_get_sources_from(nodes, dag_edges):
assert graph.get_direct_sources_from(nodes["C"]) == set([nodes["A"], nodes["B"]]) assert graph.get_direct_sources_from(nodes["C"]) == set([nodes["A"], nodes["B"]])
assert graph.get_direct_sources_from(nodes["D"]) == set([nodes["C"]]) assert graph.get_direct_sources_from(nodes["D"]) == set([nodes["C"]])
assert graph.get_sources_from(nodes["D"]) == set( assert graph.get_sources_from(nodes["D"]) == set([nodes["A"], nodes["B"], nodes["C"]])
[nodes["A"], nodes["B"], nodes["C"]]
)
def test_valid_dage(dag_edges, notdag_edges): def test_valid_dage(dag_edges, notdag_edges):

View File

@@ -1,18 +1,43 @@
from plesna.graph.graph_set import EdgeOnSet, GraphSet, Node from plesna.graph.graph import Graph
from plesna.graph.graph_set import GraphSet
from plesna.models.graphs import Edge, EdgeOnSet, Node
def test_init(): def test_init():
graph_set = GraphSet()
nodeA = Node(name="A") nodeA = Node(name="A")
nodeB = Node(name="B") nodeB = Node(name="B")
nodeC = Node(name="C") nodeC = Node(name="C")
edge1 = EdgeOnSet(arrow="arrow", sources=[nodeA, nodeB], targets=[nodeC])
def arrow(sources, targets):
targets["C"].infos["res"] = sources["A"].name + sources["B"].name
edge1 = EdgeOnSet(
arrow=arrow, sources={"A": nodeA, "B": nodeB}, targets={"C": nodeC}
)
graph_set = GraphSet()
graph_set.append(edge1) graph_set.append(edge1)
assert graph_set.node_sets == {frozenset([nodeA, nodeB]), frozenset([nodeC])} assert graph_set.node_sets == {frozenset([nodeA, nodeB]), frozenset([nodeC])}
def test_to_graph():
graph_set = GraphSet()
nodeA = Node(name="A")
nodeB = Node(name="B")
nodeC = Node(name="C")
nodeD = Node(name="D")
edge1 = EdgeOnSet(arrow="arrow-AB-C", sources=[nodeA, nodeB], targets=[nodeC])
edge2 = EdgeOnSet(arrow="arrow-C-D", sources=[nodeC], targets=[nodeD])
graph_set.append(edge1)
graph_set.append(edge2)
graph = graph_set.to_graph()
assert graph.nodes == {
nodeA,
nodeB,
nodeC,
nodeD,
}
assert graph.edges == [
Edge(arrow="arrow-AB-C", source=nodeA, target=nodeC),
Edge(arrow="arrow-AB-C", source=nodeB, target=nodeC),
Edge(arrow="arrow-C-D", source=nodeC, target=nodeD),
]

0
tests/libs/__init__.py Normal file
View File

View File

@@ -0,0 +1,18 @@
import pytest
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
def test_extract_values_from_pattern():
source = "id:truc-bidule-machin"
pattern = "id:{champ1}-{champ2}-machin"
assert extract_values_from_pattern(pattern, source) == {"champ1": "truc", "champ2": "bidule"}
def test_extract_values_from_pattern_no_match():
source = "id:truc-bidule"
pattern = "id:{champ1}-{champ2}-machin"
with pytest.raises(StringToolsError):
extract_values_from_pattern(pattern, source)

View File

@@ -0,0 +1,3 @@
Identifier,One-time password
9012,12se74
2070,04ap67
1 Identifier One-time password
2 9012 12se74
3 2070 04ap67

View File

@@ -0,0 +1,4 @@
Identifier,One-time password
9012,32ui83
9346,14ju73
5079,09ja61
1 Identifier One-time password
2 9012 32ui83
3 9346 14ju73
4 5079 09ja61

View File

@@ -0,0 +1,4 @@
Identifier,One-time password
9012,74iu23
2070,12io89
5079,85nc83
1 Identifier One-time password
2 9012 74iu23
3 2070 12io89
4 5079 85nc83

View File

@@ -1,7 +0,0 @@
Username;Identifier;First name;Last name
booker12;9012;Rachel;Booker
grey07;2070;Laura;Grey
johnson81;4081;Craig;Johnson
jenkins46;9346;Mary;Jenkins
smith79;5079;Jamie;Smith
1 Username Identifier First name Last name
2 booker12 9012 Rachel Booker
3 grey07 2070 Laura Grey
4 johnson81 4081 Craig Johnson
5 jenkins46 9346 Mary Jenkins
6 smith79 5079 Jamie Smith

View File

@@ -0,0 +1,6 @@
Username,Identifier,First name,Last name,Department,Location
booker12,9012,Rachel,Booker,Sales,Manchester
grey07,2070,Laura,Grey,Depot,London
johnson81,4081,Craig,Johnson,Depot,London
jenkins46,9346,Mary,Jenkins,Engineering,Manchester
smith79,5079,Jamie,Smith,Engineering,Manchester
1 Username Identifier First name Last name Department Location
2 booker12 9012 Rachel Booker Sales Manchester
3 grey07 2070 Laura Grey Depot London
4 johnson81 4081 Craig Johnson Depot London
5 jenkins46 9346 Mary Jenkins Engineering Manchester
6 smith79 5079 Jamie Smith Engineering Manchester

View File

View File

@@ -0,0 +1,115 @@
import shutil
from pathlib import Path
import pytest
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@pytest.fixture
def location(tmp_path):
schema = tmp_path / "schema"
example_src = FIXTURE_DIR
assert example_src.exists()
shutil.copytree(src=example_src.absolute(), dst=schema.absolute())
return tmp_path
def test_init(location):
repo = FSDataRepository("example", "example", location)
assert repo.ls() == [
"schema",
]
assert repo.ls(dir="schema") == [
"username",
"recovery",
"salary",
]
assert repo.ls(recursive=True) == [
"schema",
"schema/username",
"schema/recovery",
"schema/salary",
"schema/username/username.csv",
"schema/recovery/2022.csv",
"schema/recovery/2023.csv",
"schema/recovery/2024.csv",
"schema/salary/salary.pdf",
]
@pytest.fixture
def repository(location) -> FSDataRepository:
return FSDataRepository("repo_id", "example", location)
def test_list_schemas(repository):
assert repository.schemas() == ["repo_id-schema"]
def test_describe_schema(location, repository):
schema = repository.schema("repo_id-schema")
assert schema.name == "schema"
assert schema.id == "repo_id-schema"
assert schema.repo_id == "repo_id"
assert schema.value == str(location / "schema")
assert schema.tables == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
def test_list_tables_schema(repository):
assert repository.schema("repo_id-schema").tables == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
assert repository.tables("repo_id-schema") == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
assert repository.tables() == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
def test_describe_table(location, repository):
table = repository.table("repo_id-schema-username")
assert table.id == "repo_id-schema-username"
assert table.repo_id == "repo_id"
assert table.schema_id == "repo_id-schema"
assert table.name == "username"
assert table.value == str(location / "schema" / "username")
assert table.partitions == ["username.csv"]
assert table.datas == [table.value + "/username.csv"]
def test_describe_table_with_partitions(location, repository):
table = repository.table("repo_id-schema-recovery")
assert table.id == "repo_id-schema-recovery"
assert table.repo_id == "repo_id"
assert table.schema_id == "repo_id-schema"
assert table.name == "recovery"
assert table.value == str(location / "schema" / "recovery")
assert table.partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
assert table.datas == [
table.value + "/2022.csv",
table.value + "/2023.csv",
table.value + "/2024.csv",
]

View File

@@ -0,0 +1,182 @@
from datetime import datetime
import shutil
from pathlib import Path
import pytest
from plesna.models.flux import FluxMetaData
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
@pytest.fixture
def location(tmp_path):
catalogpath = tmp_path / "catalog"
catalogpath.mkdir()
return catalogpath
def test_init(location):
repo = FSMetaDataRepository(location)
@pytest.fixture
def metadata_repository(location) -> FSMetaDataRepository:
return FSMetaDataRepository(location)
def test_add_flux(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
id=flux_id
)
assert metadata_filepath.exists()
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,output\n"
def test_add_and_list_fluxes(metadata_repository):
flux_ids = ["my_flux", "flux2", "blahblah"]
for f in flux_ids:
metadata_repository.add_flux(f)
assert metadata_repository.fluxes() == flux_ids
def test_register_flux_execution(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={
"truc": "machin",
},
)
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
id=flux_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert (
content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
)
def test_register_and_get_exec_logs(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux_logs(flux_id)
assert logs == [
ExecutionLog(
datetime=datetime(2023, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "machin"}),
),
ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
),
]
def test_register_and_get_last_exec_log(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux(flux_id)
assert logs == ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
)
def test_add_and_list_tables(metadata_repository):
table_ids = ["my_table", "table2", "blahblah"]
for f in table_ids:
metadata_repository.add_table(f)
assert metadata_repository.tables() == table_ids
def test_register_table_modification(location, metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
id=table_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
def test_register_and_get_mod_logs(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table_logs(table_id)
assert logs == [
ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
]
def test_register_and_get_last_log(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table(table_id)
assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)