Compare commits

..

No commits in common. "dags" and "main" have entirely different histories.
dags ... main

67 changed files with 71 additions and 2395 deletions

66
Makefile Normal file
View File

@ -0,0 +1,66 @@
DATA_BASE=./datas
PDF_BASE=$(DATA_BASE)/pdfs
PDF_YEARS=$(wildcard $(PDF_BASE)/*)
RAW_BASE=$(DATA_BASE)/raw
RAW_CRG=$(RAW_BASE)/CRG
RAW_CRG_YEARS=$(subst $(PDF_BASE), $(RAW_CRG), $(PDF_YEARS))
$(RAW_CRG)/%/: $(wildcard $(PDF_BASE)/%/*)
echo $(wildcard $(PDF_BASE)/$*/*)
@echo ----
ls $(PDF_BASE)/$*/
@echo ----
echo $*
@echo ----
echo $^
@echo ----
echo $?
#./datas/raw/CRG/%:
#pdf-oralia extract all --src $$year --dest $$(subst $$PDF_BASE, $$RAW_CRG, $$year)
# $(RAW_CRG_YEARS): $(PDF_PATHS)
# for year in $(PDF_PATHS); do \
# echo $$year; \
# echo $$(subst $$PDF_BASE, $$RAW_CRG, $$year); \
# echo "----"; \
# done;
extract_pdfs:
for year in 2021 2022 2023 2024; do \
mkdir -p $(RAW_CRG)/$$year/extracted;\
pdf-oralia extract all --src $(PDF_BASE)/$$year/ --dest $(RAW_CRG)/$$year/extracted; \
pdf-oralia join --src $(RAW_CRG)/$$year/extracted/ --dest $(RAW_CRG)/$$year/; \
done
clean_raw:
rm -rf ./PLESNA Compta SYSTEM/raw/**/*.csv
clean_built:
rm -rf $(DATA_BASE)/staging/**/*.csv
rm -rf $(DATA_BASE)/gold/**/*.csv
rm -rf $(DATA_BASE)/datamart/**/*.csv
rm -rf $(DATA_BASE)/datamart/**/*.xlsx
run_ingest:
python -m scripts ingest
run_feature:
python -m scripts feature
run_datamart:
python -m scripts datamart
build: clean_built run_ingest run_feature run_datamart
clean_all: clean_built clean_raw
import_nextcloud:
rsync -av ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
push_nextcloud:
rsync -av ./datas/datamart/ ~/Nextcloud/PLESNA\ Compta\ SYSTEM/DataMart

View File

@ -1,15 +1,5 @@
# E(T)LT pour Plesna
## Installation
## Concepts
- `dataplatform`: agrégation d'un datacatalogue, de moteur de compute et du dag des transformations.
- `datacatalogue`: gestion du contenu des datastores.
- `datastore`: interface de stockage des données.
- `compute`: moteur de traitement des fluxs.
- `graph/dag`: organisation logique des fluxs et des données.
## Stages
- Raw: fichiers les plus brutes possibles

View File

@ -1,64 +0,0 @@
import dash
from dash import Dash, dcc, html
from .datalake import stages
from .pages import config, home, repository, schema, table
external_scripts = [{"src": "https://cdn.tailwindcss.com"}]
# external_script = ["https://tailwindcss.com/", {"src": "https://cdn.tailwindcss.com"}]
app = Dash(
__name__,
use_pages=True,
external_scripts=external_scripts,
suppress_callback_exceptions=True,
)
app.scripts.config.serve_locally = True
dash.register_page(
home.__name__,
path="/",
layout=home.layout,
)
dash.register_page(config.__name__, path="/config", layout=config.layout)
dash.register_page(
repository.__name__,
path_template="/repository/<repository_name>",
layout=repository.layout_factory(stages),
)
dash.register_page(
schema.__name__,
path_template="/stg/<repository_name>/schema/<schema_name>",
layout=schema.layout_factory(stages),
)
dash.register_page(
table.__name__,
path_template="/stg/<repository_name>/schm/<schema_name>/table/<table_name>",
layout=table.layout_factory(stages),
)
table.callback_factory(app)
app.layout = html.Div(
[
html.Div(
[
dcc.Link(
html.H1(
"Plesna",
),
href="/",
className="text-4xl p-4 text-center grow align-baseline",
),
dcc.Link(
"Config",
href="/config",
className="flex-none hover:bg-amber-100 p-4 align-middle",
),
],
className="bg-amber-300 flex flex-row shadow",
),
dash.page_container,
]
)
if __name__ == "__main__":
app.run(debug=True)

View File

@ -1,57 +0,0 @@
from dash import dcc, html
from ..libs.repository.repository import AbstractRepository
def html_list_schema(stage:AbstractRepository, with_tables=True):
""" Build html list of schema in stage """
ul_classes = "ml-2"
schema_baseurl = f"/stg/{stage.name}/schema/"
if with_tables:
return html.Ul(
[
html.Li(
children = [
dcc.Link(
schema,
href=schema_baseurl + schema,
className="text-lg hover:underline"
),
html_list_table(stage, schema)
],
className=""
) for schema in stage.schemas()
],
className=ul_classes
)
return html.Ul(
[
html.Li(
dcc.Link(
schema,
href=schema_baseurl + schema,
className="text-lg hover:underline"
),
) for schema in stage.schemas()
],
className=ul_classes
)
def html_list_table(stage:AbstractRepository, schema:str):
""" Build html list of table in stage """
table_baseurl = f"/stg/{stage.name}/schm/{schema}/table/"
return html.Ul(
[
html.Li(
dcc.Link(
table,
href=table_baseurl + table,
className="hover:underline"
),
) for table in stage.tables(schema=schema)
],
className="ml-4"
)

View File

@ -1,14 +0,0 @@
from dotenv import dotenv_values
from .libs.repository.fs_repository import FSRepository
env = {
**dotenv_values(".env"),
}
stages = {
"raw": FSRepository("raw", f"{env['DATA_PATH']}/{env['RAW_SUBPATH']}"),
"staging": FSRepository("staging", f"{env['DATA_PATH']}/{env['STAGING_SUBPATH']}"),
"gold": FSRepository("gold", f"{env['DATA_PATH']}/{env['GOLD_SUBPATH']}"),
"mart": FSRepository("mart", f"{env['DATA_PATH']}/{env['MART_SUBPATH']}"),
}

View File

@ -1,70 +0,0 @@
from collections.abc import Callable
from datetime import datetime
import pandas as pd
from pydantic import BaseModel
from ..repository.repository import AbstractRepository
class Schema(BaseModel):
repository: str
schema: str
class Table(BaseModel):
repository: str
schema: str
table: str
class Flux(BaseModel):
sources: list[Table]
destinations: dict[str, Table]
transformation: Callable[[list[pd.DataFrame]], dict[str, pd.DataFrame]]
class State(BaseModel):
statuses: dict[str, dict]
qty_out: int
failed_lines: list[str]
start: datetime
end: datetime
Repositories = dict[str, AbstractRepository]
def open_source(repositories: Repositories, source: Table) -> pd.DataFrame:
return repositories[source.repository].read(source.table, source.schema)
def write_source(
content: pd.DataFrame, repositories: Repositories, destination: Table
) -> str:
return repositories[destination.repository].write(
content, destination.table, destination.schema
)
def consume_flux(flux: Flux, repositories: dict[str, AbstractRepository]) -> State:
start = datetime.now()
src_dfs = [open_source(repositories, source) for source in flux.sources]
built_dfs = flux.transformation(src_dfs)
statuses = {
dest: write_source(df, repositories, flux.destinations[dest])
for dest, df in built_dfs.items()
}
end = datetime.now()
qty_out = 0
failed_lines = []
return State(
statuses=statuses,
qty_out=qty_out,
failed_lines=failed_lines,
start=start,
end=end,
)

View File

@ -1,86 +0,0 @@
from pathlib import Path
import pandas as pd
from .repository import AbstractRepository
ACCEPTABLE_EXTENTIONS = {
"csv": [".csv"],
"excel": [".xls", ".xlsx"],
}
class FSRepository(AbstractRepository):
def __init__(self, name, basepath, metadata_engine=None):
self.name = name
self.basepath = Path(basepath)
assert self.basepath.exists()
self._metadata_engine = metadata_engine
def ls(
self, dir="", only_files=False, only_directories=False, recursive=False
) -> list[str]:
dirpath = self.basepath / dir
if only_files:
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if not f.is_dir() and not str(f).startswith(".")
]
if only_directories:
if recursive:
return [
str(f[0].relative_to(dirpath))
for f in dirpath.walk()
if not str(f).startswith(".")
]
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if f.is_dir() and not str(f).startswith(".")
]
return [
str(f.relative_to(dirpath))
for f in dirpath.iterdir()
if not str(f).startswith(".")
]
def schemas(self, recursive=True) -> list[str]:
return self.ls("", only_directories=True, recursive=True)
def tables(self, schema: str = ".") -> list[str]:
return self.ls(schema, only_files=True)
def build_table_path(self, table: str, schema: str):
table_path = self.basepath
if schema == ".":
return table_path / table
return table_path / schema / table
def infos(self, table: str, schema: str = "."):
table_path = self.build_table_path(table, schema)
pass
def read(self, table: str, schema: str = ".", **read_options):
table_path = self.build_table_path(table, schema)
assert table_path.exists()
extension = table_path.suffix
if extension in ACCEPTABLE_EXTENTIONS["csv"]:
return pd.read_csv(table_path, **read_options)
if extension in ACCEPTABLE_EXTENTIONS["excel"]:
return pd.read_excel(table_path, engine = "openpyxl", **read_options)
raise ValueError("Bad extention. Can't open the table.")
def write(self, content, table: str, schema: str = "."):
table_path = self.build_table_path(table, schema)
pass
def delete_table(self, table: str, schema: str = "."):
table_path = self.build_table_path(table, schema)
pass

View File

@ -1,5 +0,0 @@
from abc import ABC
class AbstractMetadataEngine(ABC):
pass

View File

@ -1,37 +0,0 @@
import abc
from .metadata import AbstractMetadataEngine
class AbstractRepository(abc.ABC):
metadata_engine = AbstractMetadataEngine
@abc.abstractmethod
def schemas(self) -> list[str]:
"""List schemas"""
raise NotImplementedError
@abc.abstractmethod
def tables(self, schema) -> list[str]:
"""List table in schema"""
raise NotImplementedError
@abc.abstractmethod
def infos(self, table: str, schema: str) -> dict[str, str]:
"""Get infos about the table"""
raise NotImplementedError
@abc.abstractmethod
def read(self, table: str, schema: str):
"""Get content of the table"""
raise NotImplementedError
@abc.abstractmethod
def write(self, content, table: str, schema: str):
"""Write content into the table"""
raise NotImplementedError
@abc.abstractmethod
def delete_table(self, table: str, schema: str):
"""Delete the table"""
raise NotImplementedError

View File

@ -1,14 +0,0 @@
from dash import html
from dotenv import dotenv_values
import os
env = {
**dotenv_values(".env"),
**os.environ,
}
layout = html.Div([
html.H1('This is our Config page'),
html.Ul(children = [html.Li(f"{k} = {v}") for k,v in env.items()]),
])

View File

@ -1,27 +0,0 @@
from dash import dcc, html
from ..components.lists import html_list_schema
from ..datalake import stages
layout = html.Div([
html.Div(children=[
html.Ul(
children=[
html.Li(
children=[
dcc.Link(
stagename,
href=f"/stage/{stagename}",
className="text-2xl text-center p-2 bg-amber-100 rounded shadow"
),
html_list_schema(stage)
],
className="flex-1 bg-gray-100 rounded flex flex-col shadow"
) for stagename, stage in stages.items()
],
className="flex flex-row space-x-2"
)
],
className="w-full mt-4 px-2"
),
])

View File

@ -1,18 +0,0 @@
from dash import html
from ..components.lists import html_list_schema
from ..libs.repository.repository import AbstractRepository
def layout_factory(repositories: dict[str, AbstractRepository]):
def layout(repository_name: str = ""):
repository = repositories[repository_name]
return html.Div(
[
html.H2(f"{repository.name}", className="text-2xl p-4 py-2"),
html_list_schema(repository),
],
className="flex flex-col",
)
return layout

View File

@ -1,28 +0,0 @@
from dash import dcc, html
from ..libs.repository.repository import AbstractRepository
def layout_factory(repositories: dict[str, AbstractRepository]):
def layout(repository_name: str = "", schema_name: str = ""):
repository = repositories[repository_name]
return html.Div(
[
html.H2(
[
dcc.Link(
f"{repository.name}",
href=f"/repository/{repository.name}",
className="hover:underline",
),
html.Span(" > "),
html.Span(
f"{schema_name}",
),
],
className="text-2xl p-4 py-2",
),
]
)
return layout

View File

@ -1,130 +0,0 @@
from dash import Input, Output, State, dash_table, dcc, html
from dash.exceptions import PreventUpdate
from ..libs.repository.repository import AbstractRepository
def layout_factory(repositories: dict[str,AbstractRepository]):
def layout(repository_name:str="", schema_name:str="", table_name:str=""):
repository = repositories[repository_name]
df = repository.read(table=table_name, schema=schema_name)
return html.Div([
dcc.Store(id="table_backup"),
html.Div([
html.H2([
dcc.Link(
f"{repository.name}",
href=f"/repository/{repository.name}",
className="hover:underline"
),
html.Span(" > "),
dcc.Link(
f"{schema_name}",
href=f"/stg/{repository.name}/schema/{schema_name}",
className="hover:underline"
),
html.Span(" > "),
html.Span(table_name),
],
className="text-2xl"
),
html.Div([
html.Button(
"Editer",
id="btn_edit",
className="rounded border px-2 py-1",
style={"display": "block"}
),
html.Button(
"Sauver",
id="btn_save",
className="rounded border px-2 py-1 border-green-500 hover:bg-green-500",
style={"display": "none"}
),
html.Button(
"Annuler",
id="btn_cancel",
className="rounded border px-2 py-1 border-red-500 hover:bg-red-500",
style={"display": "none"}
),
],
className="flex flex-row space-x-2",
id="toolbar"
),
],
className="flex flex-row justify-between p-4"
),
html.Div([
html.Div([
dash_table.DataTable(
id="datatable",
data=df.to_dict('records'),
columns=[{"name": i, "id": i} for i in df.columns],
filter_action="native",
sort_action="native",
sort_mode="multi",
editable=False
)
])
],
className="overflow-y-auto"
),
],
className="p-2"
)
return layout
def callback_factory(app):
@app.callback(
Output("datatable", 'editable', allow_duplicate=True),
Output("table_backup", 'data'),
Input("btn_edit", "n_clicks"),
State("datatable", 'data'),
prevent_initial_call=True
)
def activate_editable(n_clicks, df_src):
if n_clicks is None:
raise PreventUpdate
if n_clicks > 0:
df_backup = df_src.copy()
return True, df_backup
raise PreventUpdate
@app.callback(
Output("datatable", 'editable', allow_duplicate=True),
Output("datatable", 'data', allow_duplicate=True),
Input("btn_cancel", "n_clicks"),
State("table_backup", 'data'),
prevent_initial_call=True
)
def cancel_modifications(n_clicks, data):
if n_clicks is None:
raise PreventUpdate
if n_clicks > 0 and data is not None:
return False, data.copy()
raise PreventUpdate
@app.callback(
Output("datatable", 'editable'),
Output("datatable", 'data'),
Input("btn_save", "n_clicks"),
State("datatable", 'editable'),
)
def save_modifications(n_clicks, editable):
if n_clicks is None:
raise PreventUpdate
if n_clicks > 0:
return not editable
return editable
@app.callback(
Output("btn_edit", "style"),
Output("btn_save", "style"),
Output("btn_cancel", "style"),
Input("datatable", "editable"),
)
def toolbar(editable):
if editable:
return {"display": "none"}, {"display": "block"}, {"display": "block"}
return {"display": "block"}, {"display": "none"}, {"display": "none"}

View File

View File

@ -1,8 +0,0 @@
from plesna.models.flux import Flux, FluxMetaData
def consume_flux(flux: Flux) -> FluxMetaData:
metadata = flux.transformation.function(
sources=flux.sources_dict, targets=flux.targets_dict, **flux.transformation.extra_kwrds
)
return FluxMetaData(data=metadata)

View File

@ -1,93 +0,0 @@
from collections.abc import Callable
from plesna.compute.consume_flux import consume_flux
from plesna.graph.graph import Graph
from plesna.graph.graph_set import GraphSet
from plesna.models.flux import Flux, FluxMetaData
from plesna.models.graphs import Node
from plesna.models.libs.flux_graph import flux_to_edgeonset
from plesna.storage.data_repository.data_repository import DataRepository
class DataPlateformError(Exception):
pass
class DataPlateform:
def __init__(self):
self._metadata_engine = ""
self._fluxes = {}
self._repositories = {}
def add_repository(self, repository: DataRepository) -> str:
if repository.id in self._repositories:
raise DataPlateformError("The repository {repository.id} already exists")
self._repositories[repository.id] = repository
return repository.id
@property
def repositories(self) -> list[str]:
return list(self._repositories)
def repository(self, id: str) -> DataRepository:
return self._repositories[id]
def is_valid_flux(self, flux: Flux) -> bool:
return True
def add_flux(self, flux: Flux) -> str:
if flux.id in self._fluxes:
raise DataPlateformError("The flux {flux} already exists")
assert self.is_valid_flux(flux)
self._fluxes[flux.id] = flux
return flux.id
@property
def fluxes(self) -> list[str]:
return list(self._fluxes)
def flux(self, flux_id: str) -> Flux:
return self._fluxes[flux_id]
def execute_flux(self, flux_id: str) -> FluxMetaData:
if flux_id not in self._fluxes:
raise DataPlateformError("The flux {flux_id} is not registered")
return consume_flux(self._fluxes[flux_id])
def graphset(
self,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> GraphSet:
graphset = GraphSet()
for flux in self._fluxes.values():
edge = flux_to_edgeonset(flux, name_flux, meta_flux, name_table, meta_table)
graphset.append(edge)
return graphset
def graph(
self,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> Graph:
"""Get the graph of fluxes and tables
:param name_flux: function on flux to name the edge
:param meta_flux: function on flux to attribute metadata to edge
:param name_table: function on table to name nodes
:param meta_table: function on flux to attribute metadata to nodes
"""
graph = self.graphset(name_flux, meta_flux, name_table, meta_table).to_graph()
for repo in self._repositories.values():
for schema in repo.schemas():
for table in repo.tables(schema):
t = repo.table(table)
graph.add_node(Node(name=name_table(t), metadata=meta_table(t)))
return graph

View File

@ -1,83 +0,0 @@
from typing import Set
from pydantic import BaseModel
from functools import reduce
from plesna.models.graphs import Node, Edge
class Graph:
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
self._edges: list[Edge] = []
self._nodes: Set[Node] = set()
self.add_edges(edges)
self.add_nodes(nodes)
def add_node(self, node: Node):
self._nodes.add(node)
def add_nodes(self, nodes: list[Node]):
for node in nodes:
self.add_node(node)
def add_edge(self, edge: Edge):
self._edges.append(edge)
self.add_node(edge.source)
self.add_node(edge.target)
def add_edges(self, edges: list[Edge]):
for edge in edges:
self.add_edge(edge)
@property
def nodes(self):
return self._nodes
@property
def edges(self):
return self._edges
def get_edges_from(self, node: Node) -> list[Edge]:
"""Get all edges which have the node as source"""
return [edge for edge in self._edges if edge.source == node]
def get_edges_to(self, node: Node) -> list[Edge]:
"""Get all edges which have the node as target"""
return [edge for edge in self._edges if edge.target == node]
def get_direct_targets_from(self, node: Node) -> set[Node]:
"""Get direct nodes that are accessible from the node"""
return set(edge.target for edge in self._edges if edge.source == node)
def get_targets_from(self, node: Node) -> set[Node]:
"""Get all nodes that are accessible from the node
If the graph have a loop, the procedure be in an infinite loop!
"""
direct_targets = self.get_direct_targets_from(node)
undirect_targets = [self.get_targets_from(n) for n in direct_targets]
undirect_targets = reduce(lambda x, y: x.union(y), undirect_targets, set())
return direct_targets.union(undirect_targets)
def get_direct_sources_from(self, node: Node) -> set[Node]:
"""Get direct nodes that are targeted the node"""
return set(edge.source for edge in self._edges if edge.target == node)
def get_sources_from(self, node: Node) -> set[Node]:
"""Get all nodes that are targeted the node"""
direct_sources = self.get_direct_sources_from(node)
undirect_sources = [self.get_sources_from(n) for n in direct_sources]
undirect_sources = reduce(lambda x, y: x.union(y), undirect_sources, set())
return direct_sources.union(undirect_sources)
def is_dag(self) -> bool:
visited = set()
for node in self._nodes:
if node not in visited:
try:
targets = self.get_targets_from(node)
except RecursionError:
return False
visited.union(targets)
return True

View File

@ -1,39 +0,0 @@
from typing import Set
from plesna.graph.graph import Graph
from plesna.models.graphs import Edge, EdgeOnSet
from itertools import product
class GraphSet:
def __init__(self):
self._edges = []
self._node_sets = set()
def append(self, edge: EdgeOnSet):
self._edges.append(edge)
self._node_sets.add(frozenset(edge.sources))
self._node_sets.add(frozenset(edge.targets))
@property
def edges(self) -> Set[EdgeOnSet]:
return self._edges
@property
def node_sets(self) -> Set[frozenset]:
return self._node_sets
def to_graph(self) -> Graph:
graph = Graph()
for node_set in self.node_sets:
graph.add_nodes(node_set)
for edge in self._edges:
flatten_edge = [
Edge(arrow=edge.arrow, source=s, target=t, metadata=edge.metadata)
for (s, t) in product(edge.sources, edge.targets)
]
graph.add_edges(flatten_edge)
return graph
def is_valid_dag(self) -> bool:
return self.to_graph().is_dag()

View File

@ -1,18 +0,0 @@
import re
class StringToolsError(ValueError):
pass
def extract_values_from_pattern(pattern, string):
regex = re.sub(r"{(.+?)}", r"(?P<_\1>.+)", pattern)
search = re.search(regex, string)
if search:
values = list(search.groups())
keys = re.findall(r"{(.+?)}", pattern)
_dict = dict(zip(keys, values))
return _dict
raise StringToolsError(f"Can't parse '{string}' with the pattern '{pattern}'")

View File

@ -1,48 +0,0 @@
from collections.abc import Callable
from pydantic import BaseModel, computed_field
from plesna.models.storage import Table
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
function: Callable
extra_kwrds: dict = {}
class Flux(BaseModel):
id: str
name: str
sources: list[Table]
targets: list[Table]
transformation: Transformation
@computed_field
@property
def sources_dict(self) -> dict[str, Table]:
return {s.id: s for s in self.sources}
@computed_field
@property
def sources_id(self) -> dict[str, Table]:
return [s.id for s in self.sources]
@computed_field
@property
def targets_id(self) -> dict[str, Table]:
return [s.id for s in self.targets]
@computed_field
@property
def targets_dict(self) -> dict[str, Table]:
return {s.id: s for s in self.targets}
class FluxMetaData(BaseModel):
data: dict

View File

@ -1,23 +0,0 @@
from pydantic import BaseModel
class Node(BaseModel):
name: str
metadata: dict = {}
def __hash__(self):
return hash(self.name)
class Edge(BaseModel):
arrow: str
source: Node
target: Node
metadata: dict = {}
class EdgeOnSet(BaseModel):
arrow: str
sources: list[Node]
targets: list[Node]
metadata: dict = {}

View File

@ -1,29 +0,0 @@
from collections.abc import Callable
from plesna.models.flux import Flux
from plesna.models.graphs import EdgeOnSet, Node
def flux_to_edgeonset(
flux: Flux,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> EdgeOnSet:
"""Convert a flux to an EdgeOnSet
:param flux: the flux
:name_flux: function on flux which returns the name of the arrow from flux
:meta_flux: function on flux which returns a dict to store in metadata field
:name_table: function on table which returns the name of node
:meta_table: function on table which returns metadata of node
"""
sources = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.sources]
targets = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.targets]
return EdgeOnSet(
arrow=name_flux(flux),
sources=sources,
targets=targets,
metadata=meta_flux(flux),
)

View File

@ -1,61 +0,0 @@
from pydantic import BaseModel
class Schema(BaseModel):
"""Where multiple tables are stored
id: uniq identifier for the schema
repo_id: id of the repo where the schema belong to
name: name of the schema
value: string which describe where to find the schema in the repository
"""
id: str
repo_id: str
name: str
value: str
tables: list[str] = []
class Table(BaseModel):
"""Place where same structured data are stored
id: uniq identifier for the table
repo_id: id of the repo where the table belong to
schema_id: id of the schema where table belong to
name: the name of the table
value: string which describe where to find the table in the storage system
partitions: list of partitions
datas: list of string to access data
"""
id: str
repo_id: str
schema_id: str
name: str
value: str
datas: list[str]
partitions: list[str] = []
metadata: dict = {}
class Partition(BaseModel):
"""Place where data are stored
id: uniq identifier for the table
repo_id: id of the repo where the table belong to
schema_id: id of the schema where table belong to
table_id: id of the schema where table belong to
name: the name of the partition
value: string which describe where to find the partition in the storage system
"""
id: str
repo_id: str
schema_id: str
table_id: str
name: str
value: str

View File

@ -1,37 +0,0 @@
import abc
from plesna.models.storage import Partition, Schema, Table
class DataRepository:
def __init__(self, id: str, name: str):
self._id = id
self._name = name
@property
def id(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@abc.abstractmethod
def schemas(self) -> list[str]:
"""List schema's ids"""
raise NotImplementedError
@abc.abstractmethod
def schema(self, schema_id: str) -> Schema:
"""Get the schema properties"""
raise NotImplementedError
@abc.abstractmethod
def tables(self, schema_id: str) -> list[str]:
"""List table's name in schema (the id)"""
raise NotImplementedError
@abc.abstractmethod
def table(self, table_id: str) -> Table:
"""Get the table properties (the id)"""
raise NotImplementedError

View File

@ -1,197 +0,0 @@
from pathlib import Path
from pydantic import BaseModel, computed_field
from plesna.libs.string_tools import extract_values_from_pattern
from plesna.models.storage import Schema, Table
from plesna.storage.data_repository.data_repository import DataRepository
class FSTable(BaseModel):
name: str
repo_id: str
schema_id: str
id: str
path: Path
is_partitionned: bool
partitions: list[str] = []
@computed_field
@property
def ref(self) -> Table:
if self.is_partitionned:
datas = [str(self.path.absolute() / p) for p in self.partitions]
else:
datas = [str(self.path.absolute())]
return Table(
id=self.id,
repo_id=self.repo_id,
schema_id=self.schema_id,
name=self.name,
value=str(self.path.absolute()),
partitions=self.partitions,
datas=datas,
)
class FSSchema(BaseModel):
name: str
repo_id: str
id: str
path: Path
tables: list[str]
@computed_field
@property
def ref(self) -> Schema:
return Schema(
id=self.id,
repo_id=self.repo_id,
name=self.name,
value=str(self.path.absolute()),
tables=self.tables,
)
class FSRepositoryError(ValueError):
pass
class FSDataRepository(DataRepository):
"""Data Repository based on files tree structure
- first level: schemas
- second level: tables
- third level: partition (actual datas)
"""
ID_FMT = {
"schema": "{repo_id}-{schema_name}",
"table": "{schema_id}-{table_name}",
}
def __init__(self, id: str, name: str, basepath: str):
super().__init__(id, name)
self._basepath = Path(basepath)
assert self._basepath.exists()
def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]:
"""List files in dir
:param dir: relative path from self._basepath
:param only_files: if true return only files
:param only_directories: if true return only directories
:param recursive: list content recursively (only for)
:return: list of string describing path from self._basepath / dir
"""
dirpath = self._basepath / dir
if recursive:
paths = dirpath.rglob("*")
else:
paths = dirpath.iterdir()
if only_files:
return [
str(f.relative_to(dirpath))
for f in paths
if not f.is_dir() and not str(f).startswith(".")
]
if only_directories:
return [
str(f.relative_to(dirpath))
for f in paths
if f.is_dir() and not str(f).startswith(".")
]
return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")]
def parse_id(self, string: str, id_type: str) -> dict:
if id_type not in self.ID_FMT:
raise FSRepositoryError(
"Wrong id_type. Gots {id_type} needs to be one of {self.ID_FMT.values}"
)
parsed = extract_values_from_pattern(self.ID_FMT[id_type], string)
if not parsed:
raise FSRepositoryError(
f"Wrong format for {id_type}. Got {string} need {self.ID_FMT['id_type']}"
)
return parsed
def schemas(self) -> list[str]:
"""List schemas (sub directories within basepath)"""
subdirectories = self.ls("", only_directories=True)
return [
self.ID_FMT["schema"].format(repo_id=self.id, schema_name=d) for d in subdirectories
]
def _schema(self, schema_id: str) -> FSSchema:
"""List schemas (sub directories within basepath)"""
parsed = self.parse_id(schema_id, "schema")
repo_id = parsed["repo_id"]
schema_name = parsed["schema_name"]
schema_path = self._basepath / schema_name
if repo_id != self.id:
raise FSRepositoryError("Trying to get schema that don't belong in this repository")
tables = self.tables(schema_id)
return FSSchema(
name=schema_name,
id=schema_id,
repo_id=self.id,
schema_id=schema_id,
path=schema_path,
tables=tables,
)
def schema(self, schema_id: str) -> Schema:
return self._schema(schema_id).ref
def _tables(self, schema_id: str) -> list[str]:
parsed = self.parse_id(schema_id, "schema")
tables = self.ls(parsed["schema_name"])
return [self.ID_FMT["table"].format(table_name=t, schema_id=schema_id) for t in tables]
def tables(self, schema_id: str = "") -> list[str]:
if schema_id:
return self._tables(schema_id)
tables = []
for schema in self.schemas():
tables += self._tables(schema)
return tables
def _table(self, table_id: str) -> FSTable:
"""Get infos on the table"""
parsed = self.parse_id(table_id, "table")
schema = self._schema(parsed["schema_id"])
if not schema.path.exists():
raise FSRepositoryError(f"The schema {schema.id} does not exists.")
table_subpath = f"{schema.name}/{parsed['table_name']}"
table_path = self._basepath / table_subpath
is_partitionned = table_path.is_dir()
if is_partitionned:
partitions = self.ls(table_subpath, only_files=True)
else:
partitions = []
return FSTable(
name=parsed["table_name"],
id=table_id,
repo_id=self.id,
schema_id=schema.id,
path=table_path,
is_partitionned=is_partitionned,
partitions=partitions,
)
def table(self, table_id: str) -> Table:
return self._table(table_id).ref

View File

@ -1,24 +0,0 @@
import abc
from plesna.models.storage import Schema
class DataCatalogue:
def __init__(self):
pass
@property
@abc.abstractmethod
def schemas(self) -> list[str]:
"""List schema's names"""
raise NotImplementedError
@abc.abstractmethod
def schema(self, name: str) -> Schema:
"""Get the schema properties"""
raise NotImplementedError
@abc.abstractmethod
def tables(self, schema: str) -> list[str]:
"""List table's name in schema"""
raise NotImplementedError

View File

@ -1,132 +0,0 @@
from pathlib import Path
from datetime import datetime
import csv
import json
from typing import Iterable
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
from plesna.storage.metadata_repository.metadata_repository import (
ExecutionLog,
MetaDataRepository,
ModificationLog,
)
class FSMetaDataRepositoryError(ValueError):
pass
class FSMetaDataRepository(MetaDataRepository):
"""MetaData Repository based on csv files
Files organisations: executions and modifications are stored in csv file according to ***_FILEMODEL
"""
OBJECTS = {
"flux": {"filemodel": "{id}_execution.csv", "logmodel": ExecutionLog},
"table": {"filemodel": "{id}_execution.csv", "logmodel": ModificationLog},
}
def __init__(self, basepath: str):
super().__init__()
self._basepath = Path(basepath)
assert self._basepath.exists()
def get_things(self, what: str) -> list[str]:
"""List all ids for 'what'"""
whats = []
for filepath in self._basepath.iterdir():
try:
founded = extract_values_from_pattern(
self.OBJECTS[what]["filemodel"], filepath.name
)
except StringToolsError:
pass
else:
whats.append(founded["id"])
return whats
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
return self.get_things(what="flux")
def tables(
self,
) -> list[str]:
"""List all table's ids"""
return self.get_things(what="table")
def _add_thing(self, what: str, id: str) -> str:
"""Add the new things 'what'"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
filepath.touch()
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
)
writer.writeheader()
return id
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
return self._add_thing(what="flux", id=flux_id)
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
return self._add_thing(what="table", id=table_id)
def _register_things_event(self, what: str, id: str, dt: datetime, event: dict) -> ExecutionLog:
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
metadata_ = self.OBJECTS[what]["logmodel"](datetime=dt, **event)
with open(filepath, "a") as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=self.OBJECTS[what]["logmodel"].model_fields.keys()
)
writer.writerow(metadata_.to_flat_dict())
return metadata_
def register_flux_execution(self, flux_id: str, dt: datetime, output: dict) -> ExecutionLog:
"""Get the flux metadata"""
return self._register_things_event("flux", flux_id, dt, {"output": {"data": output}})
def register_table_modification(self, table_id: str, dt: datetime, flux_id: str) -> str:
"""Get the table metadata"""
return self._register_things_event("table", table_id, dt, {"flux_id": flux_id})
def _get_all_log(self, what: str, id: str) -> Iterable[dict]:
"""Generate log dict from history"""
filepath = self._basepath / self.OBJECTS[what]["filemodel"].format(id=id)
if not filepath.exists:
raise FSMetaDataRepositoryError(f"The {what} {id} hasn't been added yet.")
with open(filepath, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
def flux_logs(self, flux_id: str) -> list[ExecutionLog]:
"""Get all flux logs"""
logs = []
for logline in self._get_all_log("flux", flux_id):
logline["output"] = json.loads(logline["output"])
logs.append(self.OBJECTS["flux"]["logmodel"](**logline))
return logs
def flux(self, flux_id: str) -> ExecutionLog:
"""Get the last flux log"""
return max(self.flux_logs(flux_id), key=lambda l: l.datetime)
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
return [ModificationLog(**log) for log in self._get_all_log("table", table_id)]
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
return max(self.table_logs(table_id), key=lambda l: l.datetime)

View File

@ -1,81 +0,0 @@
import abc
from datetime import datetime
from pydantic import BaseModel
from plesna.models.flux import FluxMetaData
class ModificationLog(BaseModel):
datetime: datetime
flux_id: str
def to_flat_dict(self):
return {"datetime": self.datetime.isoformat(), "flux_id": self.flux_id}
class ExecutionLog(BaseModel):
datetime: datetime
output: FluxMetaData
def to_flat_dict(self):
return {"datetime": self.datetime.isoformat(), "output": self.output.model_dump_json()}
class MetaDataRepository:
"""Object that stores metadata about flux, schema, tables"""
def __init__(self):
pass
@abc.abstractmethod
def fluxes(self) -> list[str]:
"""List fluxes's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_flux(self, flux_id: str) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_flux_execution(self, flux_id: str, dt: datetime, metadata: dict) -> str:
"""Get the flux metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux(self, schema_id: str) -> ExecutionLog:
"""Get the flux last execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def flux_logs(self, schema_id: str) -> list[ExecutionLog]:
"""Get all the flux execution metadata"""
raise NotImplementedError
@abc.abstractmethod
def tables(
self,
) -> list[str]:
"""List all table's ids"""
raise NotImplementedError
@abc.abstractmethod
def add_table(self, table_id: str) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def register_table_modification(self, table_id: str, dt: datetime, metadata: dict) -> str:
"""Get the table metadata"""
raise NotImplementedError
@abc.abstractmethod
def table(self, table_id: str) -> ModificationLog:
"""Get the last table's modification metadatas"""
raise NotImplementedError
@abc.abstractmethod
def table_logs(self, table_id: str) -> list[ModificationLog]:
"""Get all table's modification metadatas"""
raise NotImplementedError

View File

@ -1,16 +0,0 @@
[project]
name = "plesna"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"ruff>=0.8.5",
]
[tool.ruff]
line-length = 100
indent-width = 4
[tool.ruff.lint]
select = ["E", "F"]
ignore = ["F401"]

View File

@ -1,6 +1,7 @@
jupyter==1.0.0
pandas==2.2.2
pydantic==2.8.2
pandas==1.5.0
pdf-oralia==0.3.11
pydantic==2.6.1
click==8.1.7
openpyxl==3.1.5
xlrd==2.0.1
dlt[duckdb]>=0.4.3a0
openpyxl>=3.0.0

View File

View File

@ -1,36 +0,0 @@
from plesna.compute.consume_flux import consume_flux
from plesna.models.flux import Flux, Transformation
from plesna.models.storage import Table
def test_consume_flux():
sources = [
Table(id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
Table(id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
]
targets = [
Table(id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]),
Table(id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]),
]
def func(sources, targets, **kwrds):
return {
"sources": len(sources),
"targets": len(targets),
"kwrds": len(kwrds),
}
flux = Flux(
id="flux",
name="flux",
sources=sources,
targets=targets,
transformation=Transformation(function=func, extra_kwrds={"extra": "super"}),
)
meta = consume_flux(flux)
assert meta.data == {
"sources": 2,
"targets": 2,
"kwrds": 1,
}

View File

@ -1,280 +0,0 @@
import shutil
from pathlib import Path
import pytest
from plesna.dataplatform import DataPlateform
from plesna.models.graphs import Edge, EdgeOnSet, Node
from plesna.models.flux import Flux, Transformation
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@pytest.fixture
def repository(tmp_path) -> FSDataRepository:
example_src = FIXTURE_DIR
assert example_src.exists()
raw_path = Path(tmp_path) / "raw"
shutil.copytree(src=example_src.absolute(), dst=raw_path.absolute())
bronze_path = Path(tmp_path) / "bronze"
bronze_path.mkdir()
silver_path = Path(tmp_path) / "silver"
silver_path.mkdir()
return FSDataRepository("test", "test", tmp_path)
def test_add_repository(
repository: FSDataRepository,
):
dp = DataPlateform()
dp.add_repository(repository)
assert dp.repositories == ["test"]
assert dp.repository("test") == repository
@pytest.fixture
def copy_flux(repository: FSDataRepository) -> Flux:
raw_username = [repository.table("test-raw-username")]
bronze_username = [repository.table("test-bronze-username")]
def copy(sources, targets):
src_path = Path(sources["test-raw-username"].datas[0])
tgt_path = Path(targets["test-bronze-username"].datas[0])
shutil.copy(src_path, tgt_path)
return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size}
extra_kwrds = {}
raw_brz_copy_username = Flux(
id="copy_flux",
name="copy",
sources=raw_username,
targets=bronze_username,
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
)
return raw_brz_copy_username
@pytest.fixture
def foo_flux(repository: FSDataRepository) -> Flux:
src = [
repository.table("test-raw-username"),
repository.table("test-raw-recovery"),
]
targets = [repository.table("test-bronze-foo")]
def foo(sources, targets):
return {"who": "foo"}
extra_kwrds = {}
flux = Flux(
id="foo_flux",
name="foo",
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
)
return flux
def test_add_flux(repository: FSDataRepository, copy_flux: Flux, foo_flux: Flux):
dataplatform = DataPlateform()
dataplatform.add_repository(repository)
dataplatform.add_flux(flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"]
dataplatform.add_flux(flux=foo_flux)
assert dataplatform.fluxes == ["copy_flux", "foo_flux"]
assert dataplatform.flux("copy_flux") == copy_flux
assert dataplatform.flux("foo_flux") == foo_flux
@pytest.fixture
def dataplatform(
repository: FSDataRepository,
foo_flux: Flux,
copy_flux: Flux,
) -> DataPlateform:
dp = DataPlateform()
dp.add_repository(repository)
dp.add_flux(foo_flux)
dp.add_flux(copy_flux)
return dp
def test_listing_content(dataplatform: DataPlateform):
assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"]
assert dataplatform.repository("test").schema("test-raw").tables == [
"test-raw-username",
"test-raw-recovery",
"test-raw-salary",
]
assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"]
assert dataplatform.repository("test").table("test-raw-recovery").partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
def test_content_from_graphset(dataplatform: DataPlateform):
assert dataplatform.graphset().node_sets == {
frozenset(
{
Node(name="test-bronze-username"),
}
),
frozenset(
{
Node(name="test-bronze-foo"),
}
),
frozenset(
{
Node(name="test-raw-username"),
}
),
frozenset(
{
Node(name="test-raw-username"),
Node(name="test-raw-recovery"),
}
),
}
assert dataplatform.graphset().edges == [
EdgeOnSet(
arrow="foo_flux",
sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")],
targets=[Node(name="test-bronze-foo")],
metadata={},
),
EdgeOnSet(
arrow="copy_flux",
sources=[Node(name="test-raw-username")],
targets=[Node(name="test-bronze-username")],
metadata={},
),
]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph().nodes == {
Node(name="test-raw-recovery", metadata={}),
Node(name="test-raw-salary", metadata={}),
Node(name="test-raw-username", metadata={}),
Node(name="test-bronze-username", metadata={}),
Node(name="test-bronze-foo", metadata={}),
Node(name="test-raw-username", metadata={}),
}
assert dataplatform.graph().edges == [
Edge(
arrow="foo_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-foo"),
metadata={},
),
Edge(
arrow="foo_flux",
source=Node(name="test-raw-recovery"),
target=Node(name="test-bronze-foo"),
metadata={},
),
Edge(
arrow="copy_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-username"),
metadata={},
),
]
def test_content_from_graph_arguments(dataplatform: DataPlateform):
name_flux = lambda flux: f"flux-{flux.id}"
meta_flux = lambda flux: {"name": flux.name}
meta_table = lambda table: {"id": table.id, "partitions": table.partitions}
assert dataplatform.graph(
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
).nodes == {
Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}),
Node(
name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]}
),
Node(
name="test-raw-recovery",
metadata={
"id": "test-raw-recovery",
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
},
),
Node(
name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []}
),
Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
}
assert dataplatform.graph(
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
).edges == [
Edge(
arrow="flux-foo_flux",
source=Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
target=Node(
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
),
metadata={"name": "foo"},
),
Edge(
arrow="flux-foo_flux",
source=Node(
name="test-raw-recovery",
metadata={
"id": "test-raw-recovery",
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
},
),
target=Node(
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
),
metadata={"name": "foo"},
),
Edge(
arrow="flux-copy_flux",
source=Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
target=Node(
name="test-bronze-username",
metadata={"id": "test-bronze-username", "partitions": []},
),
metadata={"name": "copy"},
),
]
def test_execute_flux(dataplatform: DataPlateform):
meta = dataplatform.execute_flux("foo_flux")
assert meta.data == {"who": "foo"}
assert dataplatform.repository("test").schema("test-bronze").tables == []
meta = dataplatform.execute_flux("copy_flux")
assert meta.data == {"src_size": 283, "tgt_size": 283}
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]

Binary file not shown.

View File

@ -1,7 +0,0 @@
Username;Identifier;First name;Last name
booker12;9012;Rachel;Booker
grey07;2070;Laura;Grey
johnson81;4081;Craig;Johnson
jenkins46;9346;Mary;Jenkins
smith79;5079;Jamie;Smith
1 Username Identifier First name Last name
2 booker12 9012 Rachel Booker
3 grey07 2070 Laura Grey
4 johnson81 4081 Craig Johnson
5 jenkins46 9346 Mary Jenkins
6 smith79 5079 Jamie Smith

View File

@ -1,106 +0,0 @@
import pytest
from plesna.graph.graph import Graph
from plesna.models.graphs import Edge, Node
def test_append_nodess():
nodeA = Node(name="A")
nodeB = Node(name="B")
graph = Graph()
graph.add_node(nodeA)
graph.add_node(nodeB)
assert graph.nodes == {nodeA, nodeB}
def test_append_edges():
nodeA = Node(name="A")
nodeB = Node(name="B")
nodeC = Node(name="C")
edge1 = Edge(arrow="arrow", source=nodeA, target=nodeC)
edge2 = Edge(arrow="arrow", source=nodeB, target=nodeC)
graph = Graph()
graph.add_edge(edge1)
graph.add_edge(edge2)
assert graph.nodes == {nodeA, nodeB, nodeC}
def test_init_edges_nodes():
nodeA = Node(name="A")
nodeB = Node(name="B")
nodeC = Node(name="C")
edge1 = Edge(arrow="arrow", source=nodeB, target=nodeC)
graph = Graph()
graph.add_node(nodeA)
graph.add_edge(edge1)
assert graph.nodes == {nodeA, nodeB, nodeC}
@pytest.fixture
def nodes():
return {
"A": Node(name="A"),
"B": Node(name="B"),
"C": Node(name="C"),
"D": Node(name="D"),
}
@pytest.fixture
def dag_edges(nodes):
return {
"1": Edge(arrow="arrow", source=nodes["A"], target=nodes["C"]),
"2": Edge(arrow="arrow", source=nodes["B"], target=nodes["C"]),
"3": Edge(arrow="arrow", source=nodes["C"], target=nodes["D"]),
}
@pytest.fixture
def notdag_edges(nodes):
return {
"1": Edge(arrow="arrow", source=nodes["A"], target=nodes["C"]),
"2": Edge(arrow="arrow", source=nodes["B"], target=nodes["C"]),
"3": Edge(arrow="arrow", source=nodes["C"], target=nodes["D"]),
"4": Edge(arrow="arrow", source=nodes["D"], target=nodes["B"]),
}
def test_get_edges_from(nodes, dag_edges):
edges = dag_edges
graph = Graph(edges=edges.values())
assert graph.get_edges_from(nodes["A"]) == [edges["1"]]
def test_get_targets_from(nodes, dag_edges):
edges = dag_edges
graph = Graph(edges=edges.values())
assert graph.get_direct_targets_from(nodes["A"]) == set([nodes["C"]])
assert graph.get_direct_targets_from(nodes["C"]) == set([nodes["D"]])
assert graph.get_direct_targets_from(nodes["D"]) == set()
assert graph.get_targets_from(nodes["A"]) == set([nodes["C"], nodes["D"]])
def test_get_sources_from(nodes, dag_edges):
edges = dag_edges
graph = Graph(edges=edges.values())
assert graph.get_direct_sources_from(nodes["A"]) == set()
assert graph.get_direct_sources_from(nodes["C"]) == set([nodes["A"], nodes["B"]])
assert graph.get_direct_sources_from(nodes["D"]) == set([nodes["C"]])
assert graph.get_sources_from(nodes["D"]) == set([nodes["A"], nodes["B"], nodes["C"]])
def test_valid_dage(dag_edges, notdag_edges):
graph = Graph(edges=dag_edges.values())
assert graph.is_dag()
graph = Graph(edges=notdag_edges.values())
assert not graph.is_dag()

View File

@ -1,43 +0,0 @@
from plesna.graph.graph import Graph
from plesna.graph.graph_set import GraphSet
from plesna.models.graphs import Edge, EdgeOnSet, Node
def test_init():
graph_set = GraphSet()
nodeA = Node(name="A")
nodeB = Node(name="B")
nodeC = Node(name="C")
edge1 = EdgeOnSet(arrow="arrow", sources=[nodeA, nodeB], targets=[nodeC])
graph_set.append(edge1)
assert graph_set.node_sets == {frozenset([nodeA, nodeB]), frozenset([nodeC])}
def test_to_graph():
graph_set = GraphSet()
nodeA = Node(name="A")
nodeB = Node(name="B")
nodeC = Node(name="C")
nodeD = Node(name="D")
edge1 = EdgeOnSet(arrow="arrow-AB-C", sources=[nodeA, nodeB], targets=[nodeC])
edge2 = EdgeOnSet(arrow="arrow-C-D", sources=[nodeC], targets=[nodeD])
graph_set.append(edge1)
graph_set.append(edge2)
graph = graph_set.to_graph()
assert graph.nodes == {
nodeA,
nodeB,
nodeC,
nodeD,
}
assert graph.edges == [
Edge(arrow="arrow-AB-C", source=nodeA, target=nodeC),
Edge(arrow="arrow-AB-C", source=nodeB, target=nodeC),
Edge(arrow="arrow-C-D", source=nodeC, target=nodeD),
]

View File

View File

@ -1,18 +0,0 @@
import pytest
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
def test_extract_values_from_pattern():
source = "id:truc-bidule-machin"
pattern = "id:{champ1}-{champ2}-machin"
assert extract_values_from_pattern(pattern, source) == {"champ1": "truc", "champ2": "bidule"}
def test_extract_values_from_pattern_no_match():
source = "id:truc-bidule"
pattern = "id:{champ1}-{champ2}-machin"
with pytest.raises(StringToolsError):
extract_values_from_pattern(pattern, source)

View File

@ -1,3 +0,0 @@
Identifier,One-time password
9012,12se74
2070,04ap67
1 Identifier One-time password
2 9012 12se74
3 2070 04ap67

View File

@ -1,4 +0,0 @@
Identifier,One-time password
9012,32ui83
9346,14ju73
5079,09ja61
1 Identifier One-time password
2 9012 32ui83
3 9346 14ju73
4 5079 09ja61

View File

@ -1,4 +0,0 @@
Identifier,One-time password
9012,74iu23
2070,12io89
5079,85nc83
1 Identifier One-time password
2 9012 74iu23
3 2070 12io89
4 5079 85nc83

Binary file not shown.

View File

@ -1,6 +0,0 @@
Username,Identifier,First name,Last name,Department,Location
booker12,9012,Rachel,Booker,Sales,Manchester
grey07,2070,Laura,Grey,Depot,London
johnson81,4081,Craig,Johnson,Depot,London
jenkins46,9346,Mary,Jenkins,Engineering,Manchester
smith79,5079,Jamie,Smith,Engineering,Manchester
1 Username Identifier First name Last name Department Location
2 booker12 9012 Rachel Booker Sales Manchester
3 grey07 2070 Laura Grey Depot London
4 johnson81 4081 Craig Johnson Depot London
5 jenkins46 9346 Mary Jenkins Engineering Manchester
6 smith79 5079 Jamie Smith Engineering Manchester

View File

@ -1,115 +0,0 @@
import shutil
from pathlib import Path
import pytest
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@pytest.fixture
def location(tmp_path):
schema = tmp_path / "schema"
example_src = FIXTURE_DIR
assert example_src.exists()
shutil.copytree(src=example_src.absolute(), dst=schema.absolute())
return tmp_path
def test_init(location):
repo = FSDataRepository("example", "example", location)
assert repo.ls() == [
"schema",
]
assert repo.ls(dir="schema") == [
"username",
"recovery",
"salary",
]
assert repo.ls(recursive=True) == [
"schema",
"schema/username",
"schema/recovery",
"schema/salary",
"schema/username/username.csv",
"schema/recovery/2022.csv",
"schema/recovery/2023.csv",
"schema/recovery/2024.csv",
"schema/salary/salary.pdf",
]
@pytest.fixture
def repository(location) -> FSDataRepository:
return FSDataRepository("repo_id", "example", location)
def test_list_schemas(repository):
assert repository.schemas() == ["repo_id-schema"]
def test_describe_schema(location, repository):
schema = repository.schema("repo_id-schema")
assert schema.name == "schema"
assert schema.id == "repo_id-schema"
assert schema.repo_id == "repo_id"
assert schema.value == str(location / "schema")
assert schema.tables == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
def test_list_tables_schema(repository):
assert repository.schema("repo_id-schema").tables == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
assert repository.tables("repo_id-schema") == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
assert repository.tables() == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
def test_describe_table(location, repository):
table = repository.table("repo_id-schema-username")
assert table.id == "repo_id-schema-username"
assert table.repo_id == "repo_id"
assert table.schema_id == "repo_id-schema"
assert table.name == "username"
assert table.value == str(location / "schema" / "username")
assert table.partitions == ["username.csv"]
assert table.datas == [table.value + "/username.csv"]
def test_describe_table_with_partitions(location, repository):
table = repository.table("repo_id-schema-recovery")
assert table.id == "repo_id-schema-recovery"
assert table.repo_id == "repo_id"
assert table.schema_id == "repo_id-schema"
assert table.name == "recovery"
assert table.value == str(location / "schema" / "recovery")
assert table.partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
assert table.datas == [
table.value + "/2022.csv",
table.value + "/2023.csv",
table.value + "/2024.csv",
]

View File

@ -1,182 +0,0 @@
from datetime import datetime
import shutil
from pathlib import Path
import pytest
from plesna.models.flux import FluxMetaData
from plesna.storage.metadata_repository.fs_metadata_repository import FSMetaDataRepository
from plesna.storage.metadata_repository.metadata_repository import ExecutionLog, ModificationLog
@pytest.fixture
def location(tmp_path):
catalogpath = tmp_path / "catalog"
catalogpath.mkdir()
return catalogpath
def test_init(location):
repo = FSMetaDataRepository(location)
@pytest.fixture
def metadata_repository(location) -> FSMetaDataRepository:
return FSMetaDataRepository(location)
def test_add_flux(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
id=flux_id
)
assert metadata_filepath.exists()
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,output\n"
def test_add_and_list_fluxes(metadata_repository):
flux_ids = ["my_flux", "flux2", "blahblah"]
for f in flux_ids:
metadata_repository.add_flux(f)
assert metadata_repository.fluxes() == flux_ids
def test_register_flux_execution(location, metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={
"truc": "machin",
},
)
metadata_filepath = location / metadata_repository.OBJECTS["flux"]["filemodel"].format(
id=flux_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert (
content == 'datetime,output\n2023-03-15T14:30:00,"{""data"":{""truc"":""machin""}}"\n'
)
def test_register_and_get_exec_logs(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux_logs(flux_id)
assert logs == [
ExecutionLog(
datetime=datetime(2023, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "machin"}),
),
ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
),
]
def test_register_and_get_last_exec_log(metadata_repository):
flux_id = "my_flux"
metadata_repository.add_flux(flux_id)
metadata_repository.register_flux_execution(
flux_id,
datetime(2023, 3, 15, 14, 30),
output={"truc": "machin"},
)
metadata_repository.register_flux_execution(
flux_id,
datetime(2024, 3, 15, 14, 30),
output={
"truc": "chose",
},
)
logs = metadata_repository.flux(flux_id)
assert logs == ExecutionLog(
datetime=datetime(2024, 3, 15, 14, 30),
output=FluxMetaData(data={"truc": "chose"}),
)
def test_add_and_list_tables(metadata_repository):
table_ids = ["my_table", "table2", "blahblah"]
for f in table_ids:
metadata_repository.add_table(f)
assert metadata_repository.tables() == table_ids
def test_register_table_modification(location, metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_filepath = location / metadata_repository.OBJECTS["table"]["filemodel"].format(
id=table_id
)
with open(metadata_filepath, "r") as csvfile:
content = csvfile.read()
assert content == "datetime,flux_id\n2023-03-15T14:30:00,my_flux\n"
def test_register_and_get_mod_logs(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table_logs(table_id)
assert logs == [
ModificationLog(datetime=datetime(2023, 3, 15, 14, 30), flux_id=flux_id),
ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id),
]
def test_register_and_get_last_log(metadata_repository):
table_id = "my_table"
flux_id = "my_flux"
metadata_repository.add_table(table_id)
metadata_repository.register_table_modification(
table_id, datetime(2023, 3, 15, 14, 30), flux_id
)
metadata_repository.register_table_modification(
table_id, datetime(2024, 3, 15, 14, 30), flux_id
)
logs = metadata_repository.table(table_id)
assert logs == ModificationLog(datetime=datetime(2024, 3, 15, 14, 30), flux_id=flux_id)

View File

@ -1,131 +0,0 @@
import pandas as pd
import pytest
from dashboard.libs.flux.flux import Flux, consume_flux
from dashboard.libs.repository.repository import AbstractRepository
FakeTable = pd.DataFrame
FakeSchema = dict[str, pd.DataFrame]
FakeSchemas = dict[str, FakeSchema]
class FakeRepository(AbstractRepository):
def __init__(self, schemas: FakeSchemas):
self._schemas = {}
for schema_name, tables in schemas.items():
schema = {}
for table, df in tables.items():
schema[table] = {
"df": df,
"metadata": {
"status": "new",
"qty_read": 0,
"qty_write": 0,
},
}
self._schemas[schema_name] = schema
def schemas(self):
"""List schemas"""
return list(self._schemas.keys())
def tables(self, schema):
"""List table's name in schema"""
return list(self._schemas[schema].keys())
def infos(self, table: str, schema: str) -> dict[str, str]:
"""Get infos about the table"""
return self._schemas[schema][table]["metadata"]
def read(self, table, schema) -> pd.DataFrame:
"""Get content of the table"""
self._schemas[schema][table]["metadata"]["qty_read"] += 1
return self._schemas[schema][table]["df"]
def write(self, content, table, schema) -> dict[str, str]:
"""Write content into the table"""
try:
self._schemas[schema][table]["df"] = content
except KeyError:
self._schemas[schema][table] = {
"df": content,
"metadata": {
"status": "new",
"qty_read": 0,
"qty_write": 0,
},
}
self._schemas[schema][table]["metadata"]["status"] = "modified"
self._schemas[schema][table]["metadata"]["qty_write"] += 1
return self.infos(table, schema)
def delete_table(self, table, schema):
"""Delete the table"""
raise NotImplementedError
def test_fakerepository():
fakerepository = FakeRepository(
{
"foo": {
"table1": pd.DataFrame({"A": []}),
"table2": pd.DataFrame({"B": []}),
},
"bar": {
"table1": pd.DataFrame({"C": []}),
"table2": pd.DataFrame({"D": []}),
},
}
)
assert fakerepository.schemas() == ["foo", "bar"]
assert fakerepository.tables("foo") == ["table1", "table2"]
assert fakerepository.infos("table1", "foo") == {
"status": "new",
"qty_read": 0,
"qty_write": 0,
}
assert fakerepository.read("table1", "foo").equals(pd.DataFrame({"A": []}))
assert fakerepository.infos("table1", "foo") == {
"status": "new",
"qty_read": 1,
"qty_write": 0,
}
df = pd.DataFrame({"A": [1, 2]})
assert fakerepository.write(df, "table1", "foo") == {
"status": "modified",
"qty_read": 1,
"qty_write": 1,
}
def test_consume_flux():
source_repository = FakeRepository(
{
"source": {
"table1": pd.DataFrame({"A": [1, 2, 3]}),
},
}
)
dest_repository = FakeRepository(
{
"destination": {},
}
)
repositories = {
"source": source_repository,
"dest": dest_repository,
}
transformation = lambda dfs: {"dest": dfs[0] * 2}
flux = Flux(
sources=[{"repository": "source", "schema": "source", "table": "table1"}],
destinations={
"dest": {"repository": "dest", "schema": "destination", "table": "table1"}
},
transformation=transformation,
)
state = consume_flux(flux, repositories)
assert state.statuses["dest"] == {'status': 'modified', 'qty_read': 0, 'qty_write': 1}
assert dest_repository.read("table1", "destination").equals(pd.DataFrame({"A": [2, 4, 6]}))

7
uv.lock generated
View File

@ -1,7 +0,0 @@
version = 1
requires-python = ">=3.13"
[[package]]
name = "plesna"
version = "0.1.0"
source = { virtual = "." }