diff --git a/plesna/dataplatform.py b/plesna/dataplatform.py index 2ccfe3a..b2e78ca 100644 --- a/plesna/dataplatform.py +++ b/plesna/dataplatform.py @@ -1,3 +1,4 @@ +from collections.abc import Callable from plesna.compute.consume_flux import consume_flux from plesna.graph.graph import Graph, Node from plesna.graph.graph_set import EdgeOnSet, GraphSet @@ -49,20 +50,29 @@ class DataPlateform: raise DataPlateformError("The flux {name} is not registered") return consume_flux(self._fluxes[name]) - @property - def graphset(self) -> GraphSet: + def graphset( + self, + name_arrow: Callable = lambda flux: flux.id, + meta_edge: Callable = lambda _: {}, + meta_table: Callable = lambda table: Node(name=table.id), + ) -> GraphSet: graphset = GraphSet() for flux in self._fluxes.values(): - edge = flux_to_edgeonset(flux) + edge = flux_to_edgeonset(flux, name_arrow, meta_edge, meta_table) graphset.append(edge) return graphset - @property - def graph(self) -> Graph: - graph = self.graphset.to_graph() + def graph( + self, + name_arrow: Callable = lambda flux: flux.id, + meta_edge: Callable = lambda _: {}, + meta_table: Callable = lambda table: Node(name=table.id), + ) -> Graph: + graph = self.graphset(name_arrow, meta_edge, meta_table).to_graph() for repo in self._repositories.values(): for schema in repo.schemas(): for table in repo.tables(schema): - graph.add_node(Node(name=table)) + t = repo.table(table) + graph.add_node(meta_table(t)) return graph diff --git a/plesna/models/libs/flux_graph.py b/plesna/models/libs/flux_graph.py index e070104..d4961c9 100644 --- a/plesna/models/libs/flux_graph.py +++ b/plesna/models/libs/flux_graph.py @@ -5,18 +5,21 @@ from plesna.models.graphs import EdgeOnSet, Node def flux_to_edgeonset( flux: Flux, - meta_arrow: Callable = lambda flux: flux.id, + name_arrow: Callable = lambda flux: flux.id, + meta_edge: Callable = lambda _: {}, meta_table: Callable = lambda table: Node(name=table.id), ) -> EdgeOnSet: """Convert a flux to an EdgeOnSet :param flux: the flux - :meta_arrow: function on flux which returns things to store in arrow field + :meta_arrow: function on flux which returns the name of the arrow from flux + :meta_edge: function on flux which returns a dict to store in metadata field :meta_table: function on table which returns a Node """ return EdgeOnSet( - arrow=meta_arrow(flux), + arrow=name_arrow(flux), sources=[meta_table(s) for s in flux.sources], targets=[meta_table(t) for t in flux.targets], + metadata=meta_edge(flux), ) diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index da8a4d5..755d128 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -128,7 +128,7 @@ def test_listing_content(dataplatform: DataPlateform): def test_content_from_graphset(dataplatform: DataPlateform): - assert dataplatform.graphset.node_sets == { + assert dataplatform.graphset().node_sets == { frozenset( { Node(name="test-bronze-username"), @@ -151,7 +151,7 @@ def test_content_from_graphset(dataplatform: DataPlateform): } ), } - assert dataplatform.graphset.edges == [ + assert dataplatform.graphset().edges == [ EdgeOnSet( arrow="foo_flux", sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")], @@ -168,7 +168,7 @@ def test_content_from_graphset(dataplatform: DataPlateform): def test_content_from_graph(dataplatform: DataPlateform): - assert dataplatform.graph.nodes == { + assert dataplatform.graph().nodes == { Node(name="test-raw-recovery", infos={}), Node(name="test-raw-salary", infos={}), Node(name="test-raw-username", infos={}), @@ -176,7 +176,7 @@ def test_content_from_graph(dataplatform: DataPlateform): Node(name="test-bronze-foo", infos={}), Node(name="test-raw-username", infos={}), } - assert dataplatform.graph.edges == [ + assert dataplatform.graph().edges == [ Edge( arrow="foo_flux", source=Node(name="test-raw-username"),