test: add test on graph for dataplatform

This commit is contained in:
Bertrand Benjamin 2025-01-05 18:27:26 +01:00
parent 5ebde14be9
commit 2f170d91b6
4 changed files with 55 additions and 11 deletions

View File

@ -60,7 +60,7 @@ class DataPlateform:
@property
def graph(self) -> Graph:
graph = Graph()
graph = self.graphset.to_graph()
for repo in self._repositories.values():
for schema in repo.schemas():
for table in repo.tables(schema):

View File

@ -1,3 +1,4 @@
from typing import Set
from pydantic import BaseModel
from functools import reduce
from plesna.models.graphs import Node, Edge
@ -5,8 +6,8 @@ from plesna.models.graphs import Node, Edge
class Graph:
def __init__(self, nodes: list[Node] = [], edges: list[Edge] = []):
self._edges = []
self._nodes = set()
self._edges: list[Edge] = []
self._nodes: Set[Node] = set()
self.add_edges(edges)
self.add_nodes(nodes)

View File

@ -14,6 +14,10 @@ class GraphSet:
self._node_sets.add(frozenset(edge.sources))
self._node_sets.add(frozenset(edge.targets))
@property
def edges(self) -> Set[EdgeOnSet]:
return self._edges
@property
def node_sets(self) -> Set[frozenset]:
return self._node_sets

View File

@ -4,7 +4,7 @@ from pathlib import Path
import pytest
from plesna.dataplatform import DataPlateform
from plesna.models.graphs import Node
from plesna.models.graphs import Edge, EdgeOnSet, Node
from plesna.models.flux import Flux, Transformation
from plesna.storage.repository.fs_repository import FSRepository
@ -127,13 +127,7 @@ def test_listing_content(dataplatform: DataPlateform):
]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph.nodes == {
Node(name="test-raw-recovery", infos={}),
Node(name="test-raw-salary", infos={}),
Node(name="test-raw-username", infos={}),
}
def test_content_from_graphset(dataplatform: DataPlateform):
assert dataplatform.graphset.node_sets == {
frozenset(
{
@ -157,6 +151,51 @@ def test_content_from_graph(dataplatform: DataPlateform):
}
),
}
assert dataplatform.graphset.edges == [
EdgeOnSet(
arrow="foo_flux",
sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")],
targets=[Node(name="test-bronze-foo")],
edge_kwrds={},
),
EdgeOnSet(
arrow="copy_flux",
sources=[Node(name="test-raw-username")],
targets=[Node(name="test-bronze-username")],
edge_kwrds={},
),
]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph.nodes == {
Node(name="test-raw-recovery", infos={}),
Node(name="test-raw-salary", infos={}),
Node(name="test-raw-username", infos={}),
Node(name="test-bronze-username", infos={}),
Node(name="test-bronze-foo", infos={}),
Node(name="test-raw-username", infos={}),
}
assert dataplatform.graph.edges == [
Edge(
arrow="foo_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-foo"),
edge_kwrds={},
),
Edge(
arrow="foo_flux",
source=Node(name="test-raw-recovery"),
target=Node(name="test-bronze-foo"),
edge_kwrds={},
),
Edge(
arrow="copy_flux",
source=Node(name="test-raw-username"),
target=Node(name="test-bronze-username"),
edge_kwrds={},
),
]
def test_execute_flux(dataplatform: DataPlateform):