281 lines
8.4 KiB
Python
281 lines
8.4 KiB
Python
import shutil
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from plesna.dataplatform import DataPlateform
|
|
from plesna.models.graphs import Edge, EdgeOnSet, Node
|
|
from plesna.models.flux import Flux, Transformation
|
|
from plesna.storage.data_repository.fs_data_repository import FSDataRepository
|
|
|
|
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
|
|
|
|
|
|
@pytest.fixture
|
|
def repository(tmp_path) -> FSDataRepository:
|
|
example_src = FIXTURE_DIR
|
|
assert example_src.exists()
|
|
|
|
raw_path = Path(tmp_path) / "raw"
|
|
shutil.copytree(src=example_src.absolute(), dst=raw_path.absolute())
|
|
|
|
bronze_path = Path(tmp_path) / "bronze"
|
|
bronze_path.mkdir()
|
|
silver_path = Path(tmp_path) / "silver"
|
|
silver_path.mkdir()
|
|
|
|
return FSDataRepository("test", "test", tmp_path)
|
|
|
|
|
|
def test_add_repository(
|
|
repository: FSDataRepository,
|
|
):
|
|
dp = DataPlateform()
|
|
dp.add_repository(repository)
|
|
|
|
assert dp.repositories == ["test"]
|
|
|
|
assert dp.repository("test") == repository
|
|
|
|
|
|
@pytest.fixture
|
|
def copy_flux(repository: FSDataRepository) -> Flux:
|
|
raw_username = [repository.table("test-raw-username")]
|
|
bronze_username = [repository.table("test-bronze-username")]
|
|
|
|
def copy(sources, targets):
|
|
src_path = Path(sources["test-raw-username"].datas[0])
|
|
tgt_path = Path(targets["test-bronze-username"].datas[0])
|
|
shutil.copy(src_path, tgt_path)
|
|
return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size}
|
|
|
|
extra_kwrds = {}
|
|
|
|
raw_brz_copy_username = Flux(
|
|
id="copy_flux",
|
|
name="copy",
|
|
sources=raw_username,
|
|
targets=bronze_username,
|
|
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
|
|
)
|
|
return raw_brz_copy_username
|
|
|
|
|
|
@pytest.fixture
|
|
def foo_flux(repository: FSDataRepository) -> Flux:
|
|
src = [
|
|
repository.table("test-raw-username"),
|
|
repository.table("test-raw-recovery"),
|
|
]
|
|
targets = [repository.table("test-bronze-foo")]
|
|
|
|
def foo(sources, targets):
|
|
return {"who": "foo"}
|
|
|
|
extra_kwrds = {}
|
|
|
|
flux = Flux(
|
|
id="foo_flux",
|
|
name="foo",
|
|
sources=src,
|
|
targets=targets,
|
|
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
|
|
)
|
|
return flux
|
|
|
|
|
|
def test_add_flux(repository: FSDataRepository, copy_flux: Flux, foo_flux: Flux):
|
|
dataplatform = DataPlateform()
|
|
dataplatform.add_repository(repository)
|
|
|
|
dataplatform.add_flux(flux=copy_flux)
|
|
assert dataplatform.fluxes == ["copy_flux"]
|
|
dataplatform.add_flux(flux=foo_flux)
|
|
assert dataplatform.fluxes == ["copy_flux", "foo_flux"]
|
|
|
|
assert dataplatform.flux("copy_flux") == copy_flux
|
|
assert dataplatform.flux("foo_flux") == foo_flux
|
|
|
|
|
|
@pytest.fixture
|
|
def dataplatform(
|
|
repository: FSDataRepository,
|
|
foo_flux: Flux,
|
|
copy_flux: Flux,
|
|
) -> DataPlateform:
|
|
dp = DataPlateform()
|
|
|
|
dp.add_repository(repository)
|
|
|
|
dp.add_flux(foo_flux)
|
|
dp.add_flux(copy_flux)
|
|
return dp
|
|
|
|
|
|
def test_listing_content(dataplatform: DataPlateform):
|
|
assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"]
|
|
assert dataplatform.repository("test").schema("test-raw").tables == [
|
|
"test-raw-username",
|
|
"test-raw-recovery",
|
|
"test-raw-salary",
|
|
]
|
|
assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"]
|
|
assert dataplatform.repository("test").table("test-raw-recovery").partitions == [
|
|
"2022.csv",
|
|
"2023.csv",
|
|
"2024.csv",
|
|
]
|
|
|
|
|
|
def test_content_from_graphset(dataplatform: DataPlateform):
|
|
assert dataplatform.graphset().node_sets == {
|
|
frozenset(
|
|
{
|
|
Node(name="test-bronze-username"),
|
|
}
|
|
),
|
|
frozenset(
|
|
{
|
|
Node(name="test-bronze-foo"),
|
|
}
|
|
),
|
|
frozenset(
|
|
{
|
|
Node(name="test-raw-username"),
|
|
}
|
|
),
|
|
frozenset(
|
|
{
|
|
Node(name="test-raw-username"),
|
|
Node(name="test-raw-recovery"),
|
|
}
|
|
),
|
|
}
|
|
assert dataplatform.graphset().edges == [
|
|
EdgeOnSet(
|
|
arrow="foo_flux",
|
|
sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")],
|
|
targets=[Node(name="test-bronze-foo")],
|
|
metadata={},
|
|
),
|
|
EdgeOnSet(
|
|
arrow="copy_flux",
|
|
sources=[Node(name="test-raw-username")],
|
|
targets=[Node(name="test-bronze-username")],
|
|
metadata={},
|
|
),
|
|
]
|
|
|
|
|
|
def test_content_from_graph(dataplatform: DataPlateform):
|
|
assert dataplatform.graph().nodes == {
|
|
Node(name="test-raw-recovery", metadata={}),
|
|
Node(name="test-raw-salary", metadata={}),
|
|
Node(name="test-raw-username", metadata={}),
|
|
Node(name="test-bronze-username", metadata={}),
|
|
Node(name="test-bronze-foo", metadata={}),
|
|
Node(name="test-raw-username", metadata={}),
|
|
}
|
|
assert dataplatform.graph().edges == [
|
|
Edge(
|
|
arrow="foo_flux",
|
|
source=Node(name="test-raw-username"),
|
|
target=Node(name="test-bronze-foo"),
|
|
metadata={},
|
|
),
|
|
Edge(
|
|
arrow="foo_flux",
|
|
source=Node(name="test-raw-recovery"),
|
|
target=Node(name="test-bronze-foo"),
|
|
metadata={},
|
|
),
|
|
Edge(
|
|
arrow="copy_flux",
|
|
source=Node(name="test-raw-username"),
|
|
target=Node(name="test-bronze-username"),
|
|
metadata={},
|
|
),
|
|
]
|
|
|
|
|
|
def test_content_from_graph_arguments(dataplatform: DataPlateform):
|
|
name_flux = lambda flux: f"flux-{flux.id}"
|
|
meta_flux = lambda flux: {"name": flux.name}
|
|
meta_table = lambda table: {"id": table.id, "partitions": table.partitions}
|
|
assert dataplatform.graph(
|
|
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
|
|
).nodes == {
|
|
Node(name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}),
|
|
Node(
|
|
name="test-raw-salary", metadata={"id": "test-raw-salary", "partitions": ["salary.pdf"]}
|
|
),
|
|
Node(
|
|
name="test-raw-recovery",
|
|
metadata={
|
|
"id": "test-raw-recovery",
|
|
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
|
|
},
|
|
),
|
|
Node(
|
|
name="test-bronze-username", metadata={"id": "test-bronze-username", "partitions": []}
|
|
),
|
|
Node(
|
|
name="test-raw-username",
|
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
|
),
|
|
}
|
|
|
|
assert dataplatform.graph(
|
|
name_flux=name_flux, meta_flux=meta_flux, meta_table=meta_table
|
|
).edges == [
|
|
Edge(
|
|
arrow="flux-foo_flux",
|
|
source=Node(
|
|
name="test-raw-username",
|
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
|
),
|
|
target=Node(
|
|
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
|
|
),
|
|
metadata={"name": "foo"},
|
|
),
|
|
Edge(
|
|
arrow="flux-foo_flux",
|
|
source=Node(
|
|
name="test-raw-recovery",
|
|
metadata={
|
|
"id": "test-raw-recovery",
|
|
"partitions": ["2022.csv", "2023.csv", "2024.csv"],
|
|
},
|
|
),
|
|
target=Node(
|
|
name="test-bronze-foo", metadata={"id": "test-bronze-foo", "partitions": []}
|
|
),
|
|
metadata={"name": "foo"},
|
|
),
|
|
Edge(
|
|
arrow="flux-copy_flux",
|
|
source=Node(
|
|
name="test-raw-username",
|
|
metadata={"id": "test-raw-username", "partitions": ["username.csv"]},
|
|
),
|
|
target=Node(
|
|
name="test-bronze-username",
|
|
metadata={"id": "test-bronze-username", "partitions": []},
|
|
),
|
|
metadata={"name": "copy"},
|
|
),
|
|
]
|
|
|
|
|
|
def test_execute_flux(dataplatform: DataPlateform):
|
|
meta = dataplatform.execute_flux("foo_flux")
|
|
assert meta.data == {"who": "foo"}
|
|
|
|
assert dataplatform.repository("test").schema("test-bronze").tables == []
|
|
|
|
meta = dataplatform.execute_flux("copy_flux")
|
|
assert meta.data == {"src_size": 283, "tgt_size": 283}
|
|
|
|
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]
|