From bb691acc148f662ec5504a0bea01653071b013d1 Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Sat, 11 Jan 2025 06:35:40 +0100 Subject: [PATCH] refact: rename parameters in converting to graph function --- plesna/dataplatform.py | 33 ++++++---- plesna/models/libs/flux_graph.py | 24 ++++--- plesna/models/storage.py | 3 +- tests/dataplatform/test_dataplateform.py | 82 ++++++++++++++++++++++-- 4 files changed, 114 insertions(+), 28 deletions(-) diff --git a/plesna/dataplatform.py b/plesna/dataplatform.py index b2e78ca..0f47384 100644 --- a/plesna/dataplatform.py +++ b/plesna/dataplatform.py @@ -1,8 +1,9 @@ from collections.abc import Callable from plesna.compute.consume_flux import consume_flux -from plesna.graph.graph import Graph, Node -from plesna.graph.graph_set import EdgeOnSet, GraphSet +from plesna.graph.graph import Graph +from plesna.graph.graph_set import GraphSet from plesna.models.flux import Flux, FluxMetaData +from plesna.models.graphs import Node from plesna.models.libs.flux_graph import flux_to_edgeonset from plesna.storage.repository.repository import Repository @@ -52,27 +53,37 @@ class DataPlateform: def graphset( self, - name_arrow: Callable = lambda flux: flux.id, - meta_edge: Callable = lambda _: {}, - meta_table: Callable = lambda table: Node(name=table.id), + name_flux: Callable = lambda flux: flux.id, + meta_flux: Callable = lambda _: {}, + name_table: Callable = lambda table: table.id, + meta_table: Callable = lambda _: {}, ) -> GraphSet: graphset = GraphSet() for flux in self._fluxes.values(): - edge = flux_to_edgeonset(flux, name_arrow, meta_edge, meta_table) + edge = flux_to_edgeonset(flux, name_flux, meta_flux, name_table, meta_table) graphset.append(edge) return graphset def graph( self, - name_arrow: Callable = lambda flux: flux.id, - meta_edge: Callable = lambda _: {}, - meta_table: Callable = lambda table: Node(name=table.id), + name_flux: Callable = lambda flux: flux.id, + meta_flux: Callable = lambda _: {}, + name_table: Callable = lambda table: table.id, + meta_table: Callable = lambda _: {}, ) -> Graph: - graph = self.graphset(name_arrow, meta_edge, meta_table).to_graph() + """Get the graph of fluxes and tables + + :param name_flux: function on flux to name the edge + :param meta_flux: function on flux to attribute metadata to edge + :param name_table: function on table to name nodes + :param meta_table: function on flux to attribute metadata to nodes + + """ + graph = self.graphset(name_flux, meta_flux, name_table, meta_table).to_graph() for repo in self._repositories.values(): for schema in repo.schemas(): for table in repo.tables(schema): t = repo.table(table) - graph.add_node(meta_table(t)) + graph.add_node(Node(name=name_table(t), metadata=meta_table(t))) return graph diff --git a/plesna/models/libs/flux_graph.py b/plesna/models/libs/flux_graph.py index d4961c9..a86708a 100644 --- a/plesna/models/libs/flux_graph.py +++ b/plesna/models/libs/flux_graph.py @@ -5,21 +5,25 @@ from plesna.models.graphs import EdgeOnSet, Node def flux_to_edgeonset( flux: Flux, - name_arrow: Callable = lambda flux: flux.id, - meta_edge: Callable = lambda _: {}, - meta_table: Callable = lambda table: Node(name=table.id), + name_flux: Callable = lambda flux: flux.id, + meta_flux: Callable = lambda _: {}, + name_table: Callable = lambda table: table.id, + meta_table: Callable = lambda _: {}, ) -> EdgeOnSet: """Convert a flux to an EdgeOnSet :param flux: the flux - :meta_arrow: function on flux which returns the name of the arrow from flux - :meta_edge: function on flux which returns a dict to store in metadata field - :meta_table: function on table which returns a Node + :name_flux: function on flux which returns the name of the arrow from flux + :meta_flux: function on flux which returns a dict to store in metadata field + :name_table: function on table which returns the name of node + :meta_table: function on table which returns metadata of node """ + sources = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.sources] + targets = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.targets] return EdgeOnSet( - arrow=name_arrow(flux), - sources=[meta_table(s) for s in flux.sources], - targets=[meta_table(t) for t in flux.targets], - metadata=meta_edge(flux), + arrow=name_flux(flux), + sources=sources, + targets=targets, + metadata=meta_flux(flux), ) diff --git a/plesna/models/storage.py b/plesna/models/storage.py index 395ffa4..29fcc57 100644 --- a/plesna/models/storage.py +++ b/plesna/models/storage.py @@ -36,8 +36,9 @@ class Table(BaseModel): schema_id: str name: str value: str - partitions: list[str] = [] datas: list[str] + partitions: list[str] = [] + metadata: dict = {} class Partition(BaseModel): diff --git a/tests/dataplatform/test_dataplateform.py b/tests/dataplatform/test_dataplateform.py index 755d128..e2b0ce3 100644 --- a/tests/dataplatform/test_dataplateform.py +++ b/tests/dataplatform/test_dataplateform.py @@ -169,12 +169,12 @@ def test_content_from_graphset(dataplatform: DataPlateform): def test_content_from_graph(dataplatform: DataPlateform): assert dataplatform.graph().nodes == { - Node(name="test-raw-recovery", infos={}), - Node(name="test-raw-salary", infos={}), - Node(name="test-raw-username", infos={}), - Node(name="test-bronze-username", infos={}), - Node(name="test-bronze-foo", infos={}), - Node(name="test-raw-username", infos={}), + Node(name="test-raw-recovery", metadata={}), + Node(name="test-raw-salary", metadata={}), + Node(name="test-raw-username", metadata={}), + Node(name="test-bronze-username", metadata={}), + Node(name="test-bronze-foo", metadata={}), + Node(name="test-raw-username", metadata={}), } assert dataplatform.graph().edges == [ Edge( @@ -198,6 +198,76 @@ def test_content_from_graph(dataplatform: DataPlateform): ] +def test_content_from_graph_arguments(dataplatform: DataPlateform): + name_flux = lambda flux: f"flux-{flux.id}" + meta_flux = lambda flux: {"name": flux.name} + meta_table = lambda table: {"id": table.id, "partitions": table.partitions} + assert dataplatform.graph( + name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table + ).nodes == { + Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}), + Node( + name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]} + ), + Node( + name="test-raw-recovery", + metadata={ + "id": "test-raw-recovery", + "partitions": ["2022.csv", "2023.csv", "2024.csv"], + }, + ), + Node( + name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []} + ), + Node( + name="test-raw-username", + metadata={"id": "test-raw-username", "partitions": ["username.csv"]}, + ), + } + + assert dataplatform.graph( + name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table + ).edges == [ + Edge( + arrow="flux-foo_flux", + source=Node( + name="test-raw-username", + metadata={"id": "test-raw-username", "partitions": ["username.csv"]}, + ), + target=Node( + name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []} + ), + metadata={"name": "foo"}, + ), + Edge( + arrow="flux-foo_flux", + source=Node( + name="test-raw-recovery", + metadata={ + "id": "test-raw-recovery", + "partitions": ["2022.csv", "2023.csv", "2024.csv"], + }, + ), + target=Node( + name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []} + ), + metadata={"name": "foo"}, + ), + Edge( + arrow="flux-copy_flux", + source=Node( + name="test-raw-username", + metadata={"id": "test-raw-username", "partitions": ["username.csv"]}, + ), + target=Node( + name="test-bronze-username", + metadata={"id": "test-bronze-username", "partitions": []}, + ), + metadata={"name": "copy"}, + ), + ] + + def test_execute_flux(dataplatform: DataPlateform): meta = dataplatform.execute_flux("foo") assert meta.data == {"who": "foo"}