Compare commits

...

2 Commits

5 changed files with 79 additions and 38 deletions

View File

@ -12,31 +12,15 @@ class DataPlateformError(Exception):
class DataPlateform:
def __init__(self):
self._graphset = GraphSet()
self._graph = Graph()
self._metadata_engine = ""
self._fluxes = {}
self._repositories = {}
@property
def graphset(self) -> GraphSet:
return self._graphset
@property
def graph(self) -> Graph:
return self._graph
def repository_graph_feed(self, repository_id: str):
for schema in self._repositories[repository_id].schemas():
for table in self._repositories[repository_id].tables(schema):
self._graph.add_node(Node(name=table))
def add_repository(self, repository: Repository) -> str:
if repository.id in self._repositories:
raise DataPlateformError("The repository {repository.id} already exists")
self._repositories[repository.id] = repository
self.repository_graph_feed(repository.id)
return repository.id
@property
@ -51,14 +35,8 @@ class DataPlateform:
raise DataPlateformError("The flux {name} already exists")
self._fluxes[name] = flux
self.flux_graphset_feed(name)
return name
def flux_graphset_feed(self, flux_name: str):
flux = self.flux(flux_name)
edge = flux_to_edgeonset(flux)
self._graphset.append(edge)
@property
def fluxes(self) -> list[str]:
return list(self._fluxes)
@ -70,3 +48,21 @@ class DataPlateform:
if name not in self._fluxes:
raise DataPlateformError("The flux {name} is not registered")
return consume_flux(self._fluxes[name])
@property
def graphset(self) -> GraphSet:
graphset = GraphSet()
for flux in self._fluxes.values():
edge = flux_to_edgeonset(flux)
graphset.append(edge)
return graphset
@property
def graph(self) -> Graph:
graph = Graph()
for repo in self._repositories.values():
for schema in repo.schemas():
for table in repo.tables(schema):
graph.add_node(Node(name=table))
return graph

View File

@ -1,4 +1,7 @@
from plesna.models.graphs import EdgeOnSet
from typing import Set
from plesna.graph.graph import Graph
from plesna.models.graphs import Edge, EdgeOnSet
from itertools import product
class GraphSet:
@ -12,8 +15,21 @@ class GraphSet:
self._node_sets.add(frozenset(edge.targets))
@property
def node_sets(self):
def node_sets(self) -> Set[frozenset]:
return self._node_sets
def is_valid_dag(self):
pass
def to_graph(self) -> Graph:
graph = Graph()
for node_set in self.node_sets:
graph.add_nodes(node_set)
for edge in self._edges:
flatten_edge = [
Edge(arrow=edge.arrow, source=s, target=t, edge_kwrds=edge.edge_kwrds)
for (s, t) in product(edge.sources, edge.targets)
]
graph.add_edges(flatten_edge)
return graph
def is_valid_dag(self) -> bool:
return self.to_graph().is_dag()

View File

@ -9,7 +9,7 @@ class Node(BaseModel):
class Edge(BaseModel):
arrow_name: str
arrow: str
source: Node
target: Node
edge_kwrds: dict = {}
@ -19,3 +19,4 @@ class EdgeOnSet(BaseModel):
arrow: str
sources: list[Node]
targets: list[Node]
edge_kwrds: dict = {}

View File

@ -20,8 +20,8 @@ def test_append_edges():
nodeB = Node(name="B")
nodeC = Node(name="C")
edge1 = Edge(arrow_name="arrow", source=nodeA, target=nodeC)
edge2 = Edge(arrow_name="arrow", source=nodeB, target=nodeC)
edge1 = Edge(arrow="arrow", source=nodeA, target=nodeC)
edge2 = Edge(arrow="arrow", source=nodeB, target=nodeC)
graph = Graph()
graph.add_edge(edge1)
@ -35,7 +35,7 @@ def test_init_edges_nodes():
nodeB = Node(name="B")
nodeC = Node(name="C")
edge1 = Edge(arrow_name="arrow", source=nodeB, target=nodeC)
edge1 = Edge(arrow="arrow", source=nodeB, target=nodeC)
graph = Graph()
graph.add_node(nodeA)
@ -57,19 +57,19 @@ def nodes():
@pytest.fixture
def dag_edges(nodes):
return {
"1": Edge(arrow_name="arrow", source=nodes["A"], target=nodes["C"]),
"2": Edge(arrow_name="arrow", source=nodes["B"], target=nodes["C"]),
"3": Edge(arrow_name="arrow", source=nodes["C"], target=nodes["D"]),
"1": Edge(arrow="arrow", source=nodes["A"], target=nodes["C"]),
"2": Edge(arrow="arrow", source=nodes["B"], target=nodes["C"]),
"3": Edge(arrow="arrow", source=nodes["C"], target=nodes["D"]),
}
@pytest.fixture
def notdag_edges(nodes):
return {
"1": Edge(arrow_name="arrow", source=nodes["A"], target=nodes["C"]),
"2": Edge(arrow_name="arrow", source=nodes["B"], target=nodes["C"]),
"3": Edge(arrow_name="arrow", source=nodes["C"], target=nodes["D"]),
"4": Edge(arrow_name="arrow", source=nodes["D"], target=nodes["B"]),
"1": Edge(arrow="arrow", source=nodes["A"], target=nodes["C"]),
"2": Edge(arrow="arrow", source=nodes["B"], target=nodes["C"]),
"3": Edge(arrow="arrow", source=nodes["C"], target=nodes["D"]),
"4": Edge(arrow="arrow", source=nodes["D"], target=nodes["B"]),
}

View File

@ -1,5 +1,6 @@
from plesna.graph.graph import Graph
from plesna.graph.graph_set import GraphSet
from plesna.models.graphs import EdgeOnSet, Node
from plesna.models.graphs import Edge, EdgeOnSet, Node
def test_init():
@ -13,3 +14,30 @@ def test_init():
graph_set.append(edge1)
assert graph_set.node_sets == {frozenset([nodeA, nodeB]), frozenset([nodeC])}
def test_to_graph():
graph_set = GraphSet()
nodeA = Node(name="A")
nodeB = Node(name="B")
nodeC = Node(name="C")
nodeD = Node(name="D")
edge1 = EdgeOnSet(arrow="arrow-AB-C", sources=[nodeA, nodeB], targets=[nodeC])
edge2 = EdgeOnSet(arrow="arrow-C-D", sources=[nodeC], targets=[nodeD])
graph_set.append(edge1)
graph_set.append(edge2)
graph = graph_set.to_graph()
assert graph.nodes == {
nodeA,
nodeB,
nodeC,
nodeD,
}
assert graph.edges == [
Edge(arrow="arrow-AB-C", source=nodeA, target=nodeC),
Edge(arrow="arrow-AB-C", source=nodeB, target=nodeC),
Edge(arrow="arrow-C-D", source=nodeC, target=nodeD),
]