Compare commits
4 Commits
5ebde14be9
...
bb691acc14
Author | SHA1 | Date | |
---|---|---|---|
bb691acc14 | |||
90472ac868 | |||
0ae6439217 | |||
2f170d91b6 |
@ -1,7 +1,9 @@
|
|||||||
|
from collections.abc import Callable
|
||||||
from plesna.compute.consume_flux import consume_flux
|
from plesna.compute.consume_flux import consume_flux
|
||||||
from plesna.graph.graph import Graph, Node
|
from plesna.graph.graph import Graph
|
||||||
from plesna.graph.graph_set import EdgeOnSet, GraphSet
|
from plesna.graph.graph_set import GraphSet
|
||||||
from plesna.models.flux import Flux, FluxMetaData
|
from plesna.models.flux import Flux, FluxMetaData
|
||||||
|
from plesna.models.graphs import Node
|
||||||
from plesna.models.libs.flux_graph import flux_to_edgeonset
|
from plesna.models.libs.flux_graph import flux_to_edgeonset
|
||||||
from plesna.storage.repository.repository import Repository
|
from plesna.storage.repository.repository import Repository
|
||||||
|
|
||||||
@ -49,20 +51,39 @@ class DataPlateform:
|
|||||||
raise DataPlateformError("The flux {name} is not registered")
|
raise DataPlateformError("The flux {name} is not registered")
|
||||||
return consume_flux(self._fluxes[name])
|
return consume_flux(self._fluxes[name])
|
||||||
|
|
||||||
@property
|
def graphset(
|
||||||
def graphset(self) -> GraphSet:
|
self,
|
||||||
|
name_flux: Callable = lambda flux: flux.id,
|
||||||
|
meta_flux: Callable = lambda _: {},
|
||||||
|
name_table: Callable = lambda table: table.id,
|
||||||
|
meta_table: Callable = lambda _: {},
|
||||||
|
) -> GraphSet:
|
||||||
graphset = GraphSet()
|
graphset = GraphSet()
|
||||||
for flux in self._fluxes.values():
|
for flux in self._fluxes.values():
|
||||||
edge = flux_to_edgeonset(flux)
|
edge = flux_to_edgeonset(flux, name_flux, meta_flux, name_table, meta_table)
|
||||||
graphset.append(edge)
|
graphset.append(edge)
|
||||||
|
|
||||||
return graphset
|
return graphset
|
||||||
|
|
||||||
@property
|
def graph(
|
||||||
def graph(self) -> Graph:
|
self,
|
||||||
graph = Graph()
|
name_flux: Callable = lambda flux: flux.id,
|
||||||
|
meta_flux: Callable = lambda _: {},
|
||||||
|
name_table: Callable = lambda table: table.id,
|
||||||
|
meta_table: Callable = lambda _: {},
|
||||||
|
) -> Graph:
|
||||||
|
"""Get the graph of fluxes and tables
|
||||||
|
|
||||||
|
:param name_flux: function on flux to name the edge
|
||||||
|
:param meta_flux: function on flux to attribute metadata to edge
|
||||||
|
:param name_table: function on table to name nodes
|
||||||
|
:param meta_table: function on flux to attribute metadata to nodes
|
||||||
|
|
||||||
|
"""
|
||||||
|
graph = self.graphset(name_flux, meta_flux, name_table, meta_table).to_graph()
|
||||||
for repo in self._repositories.values():
|
for repo in self._repositories.values():
|
||||||
for schema in repo.schemas():
|
for schema in repo.schemas():
|
||||||
for table in repo.tables(schema):
|
for table in repo.tables(schema):
|
||||||
graph.add_node(Node(name=table))
|
t = repo.table(table)
|
||||||
|
graph.add_node(Node(name=name_table(t), metadata=meta_table(t)))
|
||||||
return graph
|
return graph
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
from typing import Set
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from functools import reduce
|
from functools import reduce
|
||||||
from plesna.models.graphs import Node, Edge
|
from plesna.models.graphs import Node, Edge
|
||||||
@ -5,8 +6,8 @@ from plesna.models.graphs import Node, Edge
|
|||||||
|
|
||||||
class Graph:
|
class Graph:
|
||||||
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
|
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
|
||||||
self._edges = []
|
self._edges: list[Edge] = []
|
||||||
self._nodes = set()
|
self._nodes: Set[Node] = set()
|
||||||
self.add_edges(edges)
|
self.add_edges(edges)
|
||||||
self.add_nodes(nodes)
|
self.add_nodes(nodes)
|
||||||
|
|
||||||
|
@ -14,6 +14,10 @@ class GraphSet:
|
|||||||
self._node_sets.add(frozenset(edge.sources))
|
self._node_sets.add(frozenset(edge.sources))
|
||||||
self._node_sets.add(frozenset(edge.targets))
|
self._node_sets.add(frozenset(edge.targets))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def edges(self) -> Set[EdgeOnSet]:
|
||||||
|
return self._edges
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def node_sets(self) -> Set[frozenset]:
|
def node_sets(self) -> Set[frozenset]:
|
||||||
return self._node_sets
|
return self._node_sets
|
||||||
@ -24,7 +28,7 @@ class GraphSet:
|
|||||||
graph.add_nodes(node_set)
|
graph.add_nodes(node_set)
|
||||||
for edge in self._edges:
|
for edge in self._edges:
|
||||||
flatten_edge = [
|
flatten_edge = [
|
||||||
Edge(arrow=edge.arrow, source=s, target=t, edge_kwrds=edge.edge_kwrds)
|
Edge(arrow=edge.arrow, source=s, target=t, metadata=edge.metadata)
|
||||||
for (s, t) in product(edge.sources, edge.targets)
|
for (s, t) in product(edge.sources, edge.targets)
|
||||||
]
|
]
|
||||||
graph.add_edges(flatten_edge)
|
graph.add_edges(flatten_edge)
|
||||||
|
@ -3,6 +3,7 @@ from pydantic import BaseModel
|
|||||||
|
|
||||||
class Node(BaseModel):
|
class Node(BaseModel):
|
||||||
name: str
|
name: str
|
||||||
|
metadata: dict = {}
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
return hash(self.name)
|
return hash(self.name)
|
||||||
@ -12,11 +13,11 @@ class Edge(BaseModel):
|
|||||||
arrow: str
|
arrow: str
|
||||||
source: Node
|
source: Node
|
||||||
target: Node
|
target: Node
|
||||||
edge_kwrds: dict = {}
|
metadata: dict = {}
|
||||||
|
|
||||||
|
|
||||||
class EdgeOnSet(BaseModel):
|
class EdgeOnSet(BaseModel):
|
||||||
arrow: str
|
arrow: str
|
||||||
sources: list[Node]
|
sources: list[Node]
|
||||||
targets: list[Node]
|
targets: list[Node]
|
||||||
edge_kwrds: dict = {}
|
metadata: dict = {}
|
||||||
|
29
plesna/models/libs/flux_graph.py
Normal file
29
plesna/models/libs/flux_graph.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from collections.abc import Callable
|
||||||
|
from plesna.models.flux import Flux
|
||||||
|
from plesna.models.graphs import EdgeOnSet, Node
|
||||||
|
|
||||||
|
|
||||||
|
def flux_to_edgeonset(
|
||||||
|
flux: Flux,
|
||||||
|
name_flux: Callable = lambda flux: flux.id,
|
||||||
|
meta_flux: Callable = lambda _: {},
|
||||||
|
name_table: Callable = lambda table: table.id,
|
||||||
|
meta_table: Callable = lambda _: {},
|
||||||
|
) -> EdgeOnSet:
|
||||||
|
"""Convert a flux to an EdgeOnSet
|
||||||
|
|
||||||
|
:param flux: the flux
|
||||||
|
:name_flux: function on flux which returns the name of the arrow from flux
|
||||||
|
:meta_flux: function on flux which returns a dict to store in metadata field
|
||||||
|
:name_table: function on table which returns the name of node
|
||||||
|
:meta_table: function on table which returns metadata of node
|
||||||
|
|
||||||
|
"""
|
||||||
|
sources = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.sources]
|
||||||
|
targets = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.targets]
|
||||||
|
return EdgeOnSet(
|
||||||
|
arrow=name_flux(flux),
|
||||||
|
sources=sources,
|
||||||
|
targets=targets,
|
||||||
|
metadata=meta_flux(flux),
|
||||||
|
)
|
@ -36,8 +36,9 @@ class Table(BaseModel):
|
|||||||
schema_id: str
|
schema_id: str
|
||||||
name: str
|
name: str
|
||||||
value: str
|
value: str
|
||||||
partitions: list[str] = []
|
|
||||||
datas: list[str]
|
datas: list[str]
|
||||||
|
partitions: list[str] = []
|
||||||
|
metadata: dict = {}
|
||||||
|
|
||||||
|
|
||||||
class Partition(BaseModel):
|
class Partition(BaseModel):
|
||||||
|
@ -4,7 +4,7 @@ from pathlib import Path
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from plesna.dataplatform import DataPlateform
|
from plesna.dataplatform import DataPlateform
|
||||||
from plesna.models.graphs import Node
|
from plesna.models.graphs import Edge, EdgeOnSet, Node
|
||||||
from plesna.models.flux import Flux, Transformation
|
from plesna.models.flux import Flux, Transformation
|
||||||
from plesna.storage.repository.fs_repository import FSRepository
|
from plesna.storage.repository.fs_repository import FSRepository
|
||||||
|
|
||||||
@ -127,14 +127,8 @@ def test_listing_content(dataplatform: DataPlateform):
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_content_from_graph(dataplatform: DataPlateform):
|
def test_content_from_graphset(dataplatform: DataPlateform):
|
||||||
assert dataplatform.graph.nodes == {
|
assert dataplatform.graphset().node_sets == {
|
||||||
Node(name="test-raw-recovery", infos={}),
|
|
||||||
Node(name="test-raw-salary", infos={}),
|
|
||||||
Node(name="test-raw-username", infos={}),
|
|
||||||
}
|
|
||||||
|
|
||||||
assert dataplatform.graphset.node_sets == {
|
|
||||||
frozenset(
|
frozenset(
|
||||||
{
|
{
|
||||||
Node(name="test-bronze-username"),
|
Node(name="test-bronze-username"),
|
||||||
@ -157,6 +151,121 @@ def test_content_from_graph(dataplatform: DataPlateform):
|
|||||||
}
|
}
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
assert dataplatform.graphset().edges == [
|
||||||
|
EdgeOnSet(
|
||||||
|
arrow="foo_flux",
|
||||||
|
sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")],
|
||||||
|
targets=[Node(name="test-bronze-foo")],
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
EdgeOnSet(
|
||||||
|
arrow="copy_flux",
|
||||||
|
sources=[Node(name="test-raw-username")],
|
||||||
|
targets=[Node(name="test-bronze-username")],
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_content_from_graph(dataplatform: DataPlateform):
|
||||||
|
assert dataplatform.graph().nodes == {
|
||||||
|
Node(name="test-raw-recovery", metadata={}),
|
||||||
|
Node(name="test-raw-salary", metadata={}),
|
||||||
|
Node(name="test-raw-username", metadata={}),
|
||||||
|
Node(name="test-bronze-username", metadata={}),
|
||||||
|
Node(name="test-bronze-foo", metadata={}),
|
||||||
|
Node(name="test-raw-username", metadata={}),
|
||||||
|
}
|
||||||
|
assert dataplatform.graph().edges == [
|
||||||
|
Edge(
|
||||||
|
arrow="foo_flux",
|
||||||
|
source=Node(name="test-raw-username"),
|
||||||
|
target=Node(name="test-bronze-foo"),
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
Edge(
|
||||||
|
arrow="foo_flux",
|
||||||
|
source=Node(name="test-raw-recovery"),
|
||||||
|
target=Node(name="test-bronze-foo"),
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
Edge(
|
||||||
|
arrow="copy_flux",
|
||||||
|
source=Node(name="test-raw-username"),
|
||||||
|
target=Node(name="test-bronze-username"),
|
||||||
|
metadata={},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_content_from_graph_arguments(dataplatform: DataPlateform):
|
||||||
|
name_flux = lambda flux: f"flux-{flux.id}"
|
||||||
|
meta_flux = lambda flux: {"name": flux.name}
|
||||||
|
meta_table = lambda table: {"id": table.id, "partitions": table.partitions}
|
||||||
|
assert dataplatform.graph(
|
||||||
|
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
|
||||||
|
).nodes == {
|
||||||
|
Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}),
|
||||||
|
Node(
|
||||||
|
name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]}
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
name="test-raw-recovery",
|
||||||
|
metadata={
|
||||||
|
"id": "test-raw-recovery",
|
||||||
|
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []}
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
name="test-raw-username",
|
||||||
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
assert dataplatform.graph(
|
||||||
|
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
|
||||||
|
).edges == [
|
||||||
|
Edge(
|
||||||
|
arrow="flux-foo_flux",
|
||||||
|
source=Node(
|
||||||
|
name="test-raw-username",
|
||||||
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||||
|
),
|
||||||
|
target=Node(
|
||||||
|
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
|
||||||
|
),
|
||||||
|
metadata={"name": "foo"},
|
||||||
|
),
|
||||||
|
Edge(
|
||||||
|
arrow="flux-foo_flux",
|
||||||
|
source=Node(
|
||||||
|
name="test-raw-recovery",
|
||||||
|
metadata={
|
||||||
|
"id": "test-raw-recovery",
|
||||||
|
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
target=Node(
|
||||||
|
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
|
||||||
|
),
|
||||||
|
metadata={"name": "foo"},
|
||||||
|
),
|
||||||
|
Edge(
|
||||||
|
arrow="flux-copy_flux",
|
||||||
|
source=Node(
|
||||||
|
name="test-raw-username",
|
||||||
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||||
|
),
|
||||||
|
target=Node(
|
||||||
|
name="test-bronze-username",
|
||||||
|
metadata={"id": "test-bronze-username", "partitions": []},
|
||||||
|
),
|
||||||
|
metadata={"name": "copy"},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_execute_flux(dataplatform: DataPlateform):
|
def test_execute_flux(dataplatform: DataPlateform):
|
||||||
|
Loading…
Reference in New Issue
Block a user