diff --git a/plesna/dataplatform.py b/plesna/dataplatform.py index 00bff67..2ccfe3a 100644 --- a/plesna/dataplatform.py +++ b/plesna/dataplatform.py @@ -60,7 +60,7 @@ class DataPlateform: @property def graph(self) -> Graph: - graph = Graph() + graph = self.graphset.to_graph() for repo in self._repositories.values(): for schema in repo.schemas(): for table in repo.tables(schema): diff --git a/plesna/graph/graph.py b/plesna/graph/graph.py index be5f28b..243316b 100644 --- a/plesna/graph/graph.py +++ b/plesna/graph/graph.py @@ -1,3 +1,4 @@ +from typing import Set from pydantic import BaseModel from functools import reduce from plesna.models.graphs import Node, Edge @@ -5,8 +6,8 @@ from plesna.models.graphs import Node, Edge class Graph: def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []): - self._edges = [] - self._nodes = set() + self._edges: list[Edge] = [] + self._nodes: Set[Node] = set() self.add_edges(edges) self.add_nodes(nodes) diff --git a/plesna/graph/graph_set.py b/plesna/graph/graph_set.py index df856a5..c9e71b9 100644 --- a/plesna/graph/graph_set.py +++ b/plesna/graph/graph_set.py @@ -14,6 +14,10 @@ class GraphSet: self._node_sets.add(frozenset(edge.sources)) self._node_sets.add(frozenset(edge.targets)) + @property + def edges(self) -> Set[EdgeOnSet]: + return self._edges + @property def node_sets(self) -> Set[frozenset]: return self._node_sets diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index ea9416d..64ee7e5 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -4,7 +4,7 @@ from pathlib import Path import pytest from plesna.dataplatform import DataPlateform -from plesna.models.graphs import Node +from plesna.models.graphs import Edge, EdgeOnSet, Node from plesna.models.flux import Flux, Transformation from plesna.storage.repository.fs_repository import FSRepository @@ -127,13 +127,7 @@ def test_listing_content(dataplatform: DataPlateform): ] -def test_content_from_graph(dataplatform: DataPlateform): - assert dataplatform.graph.nodes == { - Node(name="test-raw-recovery", infos={}), - Node(name="test-raw-salary", infos={}), - Node(name="test-raw-username", infos={}), - } - +def test_content_from_graphset(dataplatform: DataPlateform): assert dataplatform.graphset.node_sets == { frozenset( { @@ -157,6 +151,51 @@ def test_content_from_graph(dataplatform: DataPlateform): } ), } + assert dataplatform.graphset.edges == [ + EdgeOnSet( + arrow="foo_flux", + sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")], + targets=[Node(name="test-bronze-foo")], + edge_kwrds={}, + ), + EdgeOnSet( + arrow="copy_flux", + sources=[Node(name="test-raw-username")], + targets=[Node(name="test-bronze-username")], + edge_kwrds={}, + ), + ] + + +def test_content_from_graph(dataplatform: DataPlateform): + assert dataplatform.graph.nodes == { + Node(name="test-raw-recovery", infos={}), + Node(name="test-raw-salary", infos={}), + Node(name="test-raw-username", infos={}), + Node(name="test-bronze-username", infos={}), + Node(name="test-bronze-foo", infos={}), + Node(name="test-raw-username", infos={}), + } + assert dataplatform.graph.edges == [ + Edge( + arrow="foo_flux", + source=Node(name="test-raw-username"), + target=Node(name="test-bronze-foo"), + edge_kwrds={}, + ), + Edge( + arrow="foo_flux", + source=Node(name="test-raw-recovery"), + target=Node(name="test-bronze-foo"), + edge_kwrds={}, + ), + Edge( + arrow="copy_flux", + source=Node(name="test-raw-username"), + target=Node(name="test-bronze-username"), + edge_kwrds={}, + ), + ] def test_execute_flux(dataplatform: DataPlateform):