refact: rename parameters in converting to graph function
This commit is contained in:
parent
90472ac868
commit
bb691acc14
@ -1,8 +1,9 @@
|
||||
from collections.abc import Callable
|
||||
from plesna.compute.consume_flux import consume_flux
|
||||
from plesna.graph.graph import Graph, Node
|
||||
from plesna.graph.graph_set import EdgeOnSet, GraphSet
|
||||
from plesna.graph.graph import Graph
|
||||
from plesna.graph.graph_set import GraphSet
|
||||
from plesna.models.flux import Flux, FluxMetaData
|
||||
from plesna.models.graphs import Node
|
||||
from plesna.models.libs.flux_graph import flux_to_edgeonset
|
||||
from plesna.storage.repository.repository import Repository
|
||||
|
||||
@ -52,27 +53,37 @@ class DataPlateform:
|
||||
|
||||
def graphset(
|
||||
self,
|
||||
name_arrow: Callable = lambda flux: flux.id,
|
||||
meta_edge: Callable = lambda _: {},
|
||||
meta_table: Callable = lambda table: Node(name=table.id),
|
||||
name_flux: Callable = lambda flux: flux.id,
|
||||
meta_flux: Callable = lambda _: {},
|
||||
name_table: Callable = lambda table: table.id,
|
||||
meta_table: Callable = lambda _: {},
|
||||
) -> GraphSet:
|
||||
graphset = GraphSet()
|
||||
for flux in self._fluxes.values():
|
||||
edge = flux_to_edgeonset(flux, name_arrow, meta_edge, meta_table)
|
||||
edge = flux_to_edgeonset(flux, name_flux, meta_flux, name_table, meta_table)
|
||||
graphset.append(edge)
|
||||
|
||||
return graphset
|
||||
|
||||
def graph(
|
||||
self,
|
||||
name_arrow: Callable = lambda flux: flux.id,
|
||||
meta_edge: Callable = lambda _: {},
|
||||
meta_table: Callable = lambda table: Node(name=table.id),
|
||||
name_flux: Callable = lambda flux: flux.id,
|
||||
meta_flux: Callable = lambda _: {},
|
||||
name_table: Callable = lambda table: table.id,
|
||||
meta_table: Callable = lambda _: {},
|
||||
) -> Graph:
|
||||
graph = self.graphset(name_arrow, meta_edge, meta_table).to_graph()
|
||||
"""Get the graph of fluxes and tables
|
||||
|
||||
:param name_flux: function on flux to name the edge
|
||||
:param meta_flux: function on flux to attribute metadata to edge
|
||||
:param name_table: function on table to name nodes
|
||||
:param meta_table: function on flux to attribute metadata to nodes
|
||||
|
||||
"""
|
||||
graph = self.graphset(name_flux, meta_flux, name_table, meta_table).to_graph()
|
||||
for repo in self._repositories.values():
|
||||
for schema in repo.schemas():
|
||||
for table in repo.tables(schema):
|
||||
t = repo.table(table)
|
||||
graph.add_node(meta_table(t))
|
||||
graph.add_node(Node(name=name_table(t), metadata=meta_table(t)))
|
||||
return graph
|
||||
|
@ -5,21 +5,25 @@ from plesna.models.graphs import EdgeOnSet, Node
|
||||
|
||||
def flux_to_edgeonset(
|
||||
flux: Flux,
|
||||
name_arrow: Callable = lambda flux: flux.id,
|
||||
meta_edge: Callable = lambda _: {},
|
||||
meta_table: Callable = lambda table: Node(name=table.id),
|
||||
name_flux: Callable = lambda flux: flux.id,
|
||||
meta_flux: Callable = lambda _: {},
|
||||
name_table: Callable = lambda table: table.id,
|
||||
meta_table: Callable = lambda _: {},
|
||||
) -> EdgeOnSet:
|
||||
"""Convert a flux to an EdgeOnSet
|
||||
|
||||
:param flux: the flux
|
||||
:meta_arrow: function on flux which returns the name of the arrow from flux
|
||||
:meta_edge: function on flux which returns a dict to store in metadata field
|
||||
:meta_table: function on table which returns a Node
|
||||
:name_flux: function on flux which returns the name of the arrow from flux
|
||||
:meta_flux: function on flux which returns a dict to store in metadata field
|
||||
:name_table: function on table which returns the name of node
|
||||
:meta_table: function on table which returns metadata of node
|
||||
|
||||
"""
|
||||
sources = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.sources]
|
||||
targets = [Node(name=name_table(s), metadata=meta_table(s)) for s in flux.targets]
|
||||
return EdgeOnSet(
|
||||
arrow=name_arrow(flux),
|
||||
sources=[meta_table(s) for s in flux.sources],
|
||||
targets=[meta_table(t) for t in flux.targets],
|
||||
metadata=meta_edge(flux),
|
||||
arrow=name_flux(flux),
|
||||
sources=sources,
|
||||
targets=targets,
|
||||
metadata=meta_flux(flux),
|
||||
)
|
||||
|
@ -36,8 +36,9 @@ class Table(BaseModel):
|
||||
schema_id: str
|
||||
name: str
|
||||
value: str
|
||||
partitions: list[str] = []
|
||||
datas: list[str]
|
||||
partitions: list[str] = []
|
||||
metadata: dict = {}
|
||||
|
||||
|
||||
class Partition(BaseModel):
|
||||
|
@ -169,12 +169,12 @@ def test_content_from_graphset(dataplatform: DataPlateform):
|
||||
|
||||
def test_content_from_graph(dataplatform: DataPlateform):
|
||||
assert dataplatform.graph().nodes == {
|
||||
Node(name="test-raw-recovery", infos={}),
|
||||
Node(name="test-raw-salary", infos={}),
|
||||
Node(name="test-raw-username", infos={}),
|
||||
Node(name="test-bronze-username", infos={}),
|
||||
Node(name="test-bronze-foo", infos={}),
|
||||
Node(name="test-raw-username", infos={}),
|
||||
Node(name="test-raw-recovery", metadata={}),
|
||||
Node(name="test-raw-salary", metadata={}),
|
||||
Node(name="test-raw-username", metadata={}),
|
||||
Node(name="test-bronze-username", metadata={}),
|
||||
Node(name="test-bronze-foo", metadata={}),
|
||||
Node(name="test-raw-username", metadata={}),
|
||||
}
|
||||
assert dataplatform.graph().edges == [
|
||||
Edge(
|
||||
@ -198,6 +198,76 @@ def test_content_from_graph(dataplatform: DataPlateform):
|
||||
]
|
||||
|
||||
|
||||
def test_content_from_graph_arguments(dataplatform: DataPlateform):
|
||||
name_flux = lambda flux: f"flux-{flux.id}"
|
||||
meta_flux = lambda flux: {"name": flux.name}
|
||||
meta_table = lambda table: {"id": table.id, "partitions": table.partitions}
|
||||
assert dataplatform.graph(
|
||||
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
|
||||
).nodes == {
|
||||
Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}),
|
||||
Node(
|
||||
name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]}
|
||||
),
|
||||
Node(
|
||||
name="test-raw-recovery",
|
||||
metadata={
|
||||
"id": "test-raw-recovery",
|
||||
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
|
||||
},
|
||||
),
|
||||
Node(
|
||||
name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []}
|
||||
),
|
||||
Node(
|
||||
name="test-raw-username",
|
||||
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||
),
|
||||
}
|
||||
|
||||
assert dataplatform.graph(
|
||||
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
|
||||
).edges == [
|
||||
Edge(
|
||||
arrow="flux-foo_flux",
|
||||
source=Node(
|
||||
name="test-raw-username",
|
||||
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||
),
|
||||
target=Node(
|
||||
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
|
||||
),
|
||||
metadata={"name": "foo"},
|
||||
),
|
||||
Edge(
|
||||
arrow="flux-foo_flux",
|
||||
source=Node(
|
||||
name="test-raw-recovery",
|
||||
metadata={
|
||||
"id": "test-raw-recovery",
|
||||
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
|
||||
},
|
||||
),
|
||||
target=Node(
|
||||
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
|
||||
),
|
||||
metadata={"name": "foo"},
|
||||
),
|
||||
Edge(
|
||||
arrow="flux-copy_flux",
|
||||
source=Node(
|
||||
name="test-raw-username",
|
||||
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
||||
),
|
||||
target=Node(
|
||||
name="test-bronze-username",
|
||||
metadata={"id": "test-bronze-username", "partitions": []},
|
||||
),
|
||||
metadata={"name": "copy"},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_execute_flux(dataplatform: DataPlateform):
|
||||
meta = dataplatform.execute_flux("foo")
|
||||
assert meta.data == {"who": "foo"}
|
||||
|
Loading…
Reference in New Issue
Block a user