Compare commits

...

4 Commits

7 changed files with 190 additions and 24 deletions

View File

@ -1,7 +1,9 @@
from collections.abc import Callable
from plesna.compute.consume_flux import consume_flux from plesna.compute.consume_flux import consume_flux
from plesna.graph.graph import Graph, Node from plesna.graph.graph import Graph
from plesna.graph.graph_set import EdgeOnSet, GraphSet from plesna.graph.graph_set import GraphSet
from plesna.models.flux import Flux, FluxMetaData from plesna.models.flux import Flux, FluxMetaData
from plesna.models.graphs import Node
from plesna.models.libs.flux_graph import flux_to_edgeonset from plesna.models.libs.flux_graph import flux_to_edgeonset
from plesna.storage.repository.repository import Repository from plesna.storage.repository.repository import Repository
@ -49,20 +51,39 @@ class DataPlateform:
raise DataPlateformError("The flux {name} is not registered") raise DataPlateformError("The flux {name} is not registered")
return consume_flux(self._fluxes[name]) return consume_flux(self._fluxes[name])
@property def graphset(
def graphset(self) -> GraphSet: self,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> GraphSet:
graphset = GraphSet() graphset = GraphSet()
for flux in self._fluxes.values(): for flux in self._fluxes.values():
edge = flux_to_edgeonset(flux) edge = flux_to_edgeonset(flux, name_flux, meta_flux, name_table, meta_table)
graphset.append(edge) graphset.append(edge)
return graphset return graphset
@property def graph(
def graph(self) -> Graph: self,
graph = Graph() name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> Graph:
"""Get the graph of fluxes and tables
:param name_flux: function on flux to name the edge
:param meta_flux: function on flux to attribute metadata to edge
:param name_table: function on table to name nodes
:param meta_table: function on flux to attribute metadata to nodes
"""
graph = self.graphset(name_flux, meta_flux, name_table, meta_table).to_graph()
for repo in self._repositories.values(): for repo in self._repositories.values():
for schema in repo.schemas(): for schema in repo.schemas():
for table in repo.tables(schema): for table in repo.tables(schema):
graph.add_node(Node(name=table)) t = repo.table(table)
graph.add_node(Node(name=name_table(t), metadata=meta_table(t)))
return graph return graph

View File

@ -1,3 +1,4 @@
from typing import Set
from pydantic import BaseModel from pydantic import BaseModel
from functools import reduce from functools import reduce
from plesna.models.graphs import Node, Edge from plesna.models.graphs import Node, Edge
@ -5,8 +6,8 @@ from plesna.models.graphs import Node, Edge
class Graph: class Graph:
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []): def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
self._edges = [] self._edges: list[Edge] = []
self._nodes = set() self._nodes: Set[Node] = set()
self.add_edges(edges) self.add_edges(edges)
self.add_nodes(nodes) self.add_nodes(nodes)

View File

@ -14,6 +14,10 @@ class GraphSet:
self._node_sets.add(frozenset(edge.sources)) self._node_sets.add(frozenset(edge.sources))
self._node_sets.add(frozenset(edge.targets)) self._node_sets.add(frozenset(edge.targets))
@property
def edges(self) -> Set[EdgeOnSet]:
return self._edges
@property @property
def node_sets(self) -> Set[frozenset]: def node_sets(self) -> Set[frozenset]:
return self._node_sets return self._node_sets
@ -24,7 +28,7 @@ class GraphSet:
graph.add_nodes(node_set) graph.add_nodes(node_set)
for edge in self._edges: for edge in self._edges:
flatten_edge = [ flatten_edge = [
Edge(arrow=edge.arrow, source=s, target=t, edge_kwrds=edge.edge_kwrds) Edge(arrow=edge.arrow, source=s, target=t, metadata=edge.metadata)
for (s, t) in product(edge.sources, edge.targets) for (s, t) in product(edge.sources, edge.targets)
] ]
graph.add_edges(flatten_edge) graph.add_edges(flatten_edge)

View File

@ -3,6 +3,7 @@ from pydantic import BaseModel
class Node(BaseModel): class Node(BaseModel):
name: str name: str
metadata: dict = {}
def __hash__(self): def __hash__(self):
return hash(self.name) return hash(self.name)
@ -12,11 +13,11 @@ class Edge(BaseModel):
arrow: str arrow: str
source: Node source: Node
target: Node target: Node
edge_kwrds: dict = {} metadata: dict = {}
class EdgeOnSet(BaseModel): class EdgeOnSet(BaseModel):
arrow: str arrow: str
sources: list[Node] sources: list[Node]
targets: list[Node] targets: list[Node]
edge_kwrds: dict = {} metadata: dict = {}

View File

@ -0,0 +1,29 @@
from collections.abc import Callable
from plesna.models.flux import Flux
from plesna.models.graphs import EdgeOnSet, Node
def flux_to_edgeonset(
flux: Flux,
name_flux: Callable = lambda flux: flux.id,
meta_flux: Callable = lambda _: {},
name_table: Callable = lambda table: table.id,
meta_table: Callable = lambda _: {},
) -> EdgeOnSet:
"""Convert a flux to an EdgeOnSet
:param flux: the flux
:name_flux: function on flux which returns the name of the arrow from flux
:meta_flux: function on flux which returns a dict to store in metadata field
:name_table: function on table which returns the name of node
:meta_table: function on table which returns metadata of node
"""
sources = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.sources]
targets = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.targets]
return EdgeOnSet(
arrow=name_flux(flux),
sources=sources,
targets=targets,
metadata=meta_flux(flux),
)

View File

@ -36,8 +36,9 @@ class Table(BaseModel):
schema_id: str schema_id: str
name: str name: str
value: str value: str
partitions: list[str] = []
datas: list[str] datas: list[str]
partitions: list[str] = []
metadata: dict = {}
class Partition(BaseModel): class Partition(BaseModel):

View File

@ -4,7 +4,7 @@ from pathlib import Path
import pytest import pytest
from plesna.dataplatform import DataPlateform from plesna.dataplatform import DataPlateform
from plesna.models.graphs import Node from plesna.models.graphs import Edge, EdgeOnSet, Node
from plesna.models.flux import Flux, Transformation from plesna.models.flux import Flux, Transformation
from plesna.storage.repository.fs_repository import FSRepository from plesna.storage.repository.fs_repository import FSRepository
@ -127,14 +127,8 @@ def test_listing_content(dataplatform: DataPlateform):
] ]
def test_content_from_graph(dataplatform: DataPlateform): def test_content_from_graphset(dataplatform: DataPlateform):
assert dataplatform.graph.nodes == { assert dataplatform.graphset().node_sets == {
Node(name="test-raw-recovery", infos={}),
Node(name="test-raw-salary", infos={}),
Node(name="test-raw-username", infos={}),
}
assert dataplatform.graphset.node_sets == {
frozenset( frozenset(
{ {
Node(name="test-bronze-username"), Node(name="test-bronze-username"),
@ -157,6 +151,121 @@ def test_content_from_graph(dataplatform: DataPlateform):
} }
), ),
} }
assert dataplatform.graphset().edges == [
EdgeOnSet(
arrow="foo_flux",
sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")],
targets=[Node(name="test-bronze-foo")],
metadata={},
),
EdgeOnSet(
arrow="copy_flux",
sources=[Node(name="test-raw-username")],
targets=[Node(name="test-bronze-username")],
metadata={},
),
]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph().nodes == {
Node(name="test-raw-recovery", metadata={}),
Node(name="test-raw-salary", metadata={}),
Node(name="test-raw-username", metadata={}),
Node(name="test-bronze-username", metadata={}),
Node(name="test-bronze-foo", metadata={}),
Node(name="test-raw-username", metadata={}),
}
assert dataplatform.graph().edges == [
Edge(
arrow="foo_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-foo"),
metadata={},
),
Edge(
arrow="foo_flux",
source=Node(name="test-raw-recovery"),
target=Node(name="test-bronze-foo"),
metadata={},
),
Edge(
arrow="copy_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-username"),
metadata={},
),
]
def test_content_from_graph_arguments(dataplatform: DataPlateform):
name_flux = lambda flux: f"flux-{flux.id}"
meta_flux = lambda flux: {"name": flux.name}
meta_table = lambda table: {"id": table.id, "partitions": table.partitions}
assert dataplatform.graph(
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
).nodes == {
Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}),
Node(
name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]}
),
Node(
name="test-raw-recovery",
metadata={
"id": "test-raw-recovery",
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
},
),
Node(
name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []}
),
Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
}
assert dataplatform.graph(
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
).edges == [
Edge(
arrow="flux-foo_flux",
source=Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
target=Node(
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
),
metadata={"name": "foo"},
),
Edge(
arrow="flux-foo_flux",
source=Node(
name="test-raw-recovery",
metadata={
"id": "test-raw-recovery",
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
},
),
target=Node(
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
),
metadata={"name": "foo"},
),
Edge(
arrow="flux-copy_flux",
source=Node(
name="test-raw-username",
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
),
target=Node(
name="test-bronze-username",
metadata={"id": "test-bronze-username", "partitions": []},
),
metadata={"name": "copy"},
),
]
def test_execute_flux(dataplatform: DataPlateform): def test_execute_flux(dataplatform: DataPlateform):