plesna/tests/dataplatform/test_dataplateform.py

150 lines
4.3 KiB
Python

import shutil
from pathlib import Path
import pytest
from plesna.dataplatform import DataPlateform
from plesna.graph.graph import Node
from plesna.models.flux import Flux
from plesna.models.transformation import Transformation
from plesna.storage.repository.fs_repository import FSRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@pytest.fixture
def repository(tmp_path) -> FSRepository:
example_src = FIXTURE_DIR
assert example_src.exists()
raw_path = Path(tmp_path) / "raw"
shutil.copytree(src=example_src.absolute(), dst=raw_path.absolute())
bronze_path = Path(tmp_path) / "bronze"
bronze_path.mkdir()
silver_path = Path(tmp_path) / "silver"
silver_path.mkdir()
return FSRepository("test", "test", tmp_path)
def test_add_repository(
repository: FSRepository,
):
dp = DataPlateform()
dp.add_repository(repository)
assert dp.repositories == ["test"]
assert dp.repository("test") == repository
@pytest.fixture
def copy_flux(repository: FSRepository) -> Flux:
raw_username = [repository.table("test-raw-username")]
bronze_username = [repository.table("test-bronze-username")]
def copy(sources, targets):
src_path = Path(sources["test-raw-username"].datas[0])
tgt_path = Path(targets["test-bronze-username"].datas[0])
shutil.copy(src_path, tgt_path)
return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size}
extra_kwrds = {}
raw_brz_copy_username = Flux(
sources=raw_username,
targets=bronze_username,
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
)
return raw_brz_copy_username
@pytest.fixture
def foo_flux(repository: FSRepository) -> Flux:
src = [
repository.table("test-raw-username"),
repository.table("test-raw-recovery"),
]
targets = [repository.table("test-bronze-foo")]
def foo(sources, targets):
return {"who": "foo"}
extra_kwrds = {}
flux = Flux(
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
)
return flux
def test_add_flux(repository: FSRepository, copy_flux: Flux):
dataplatform = DataPlateform()
dataplatform.add_repository(repository)
dataplatform.add_flux(name="copy_flux", flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"]
dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"]
assert dataplatform.flux("copy_flux") == copy_flux
assert dataplatform.flux("copy_flux_bis") == copy_flux
@pytest.fixture
def dataplatform(
repository: FSRepository,
foo_flux: Flux,
copy_flux: Flux,
) -> DataPlateform:
dp = DataPlateform()
dp.add_repository(repository)
dp.add_flux("foo", foo_flux)
dp.add_flux("raw_brz_copy_username", copy_flux)
return dp
def test_listing_content(dataplatform: DataPlateform):
assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"]
assert dataplatform.repository("test").schema("test-raw").tables == [
"test-raw-username",
"test-raw-recovery",
"test-raw-salary",
]
assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"]
assert dataplatform.repository("test").table("test-raw-recovery").partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph.nodes == {
Node(name="test-raw-recovery", infos={}),
Node(name="test-raw-salary", infos={}),
Node(name="test-raw-username", infos={}),
}
assert dataplatform.graphset.node_sets == {
Node(name="test-raw-username", infos={}),
Node(name="test-bronze-username", infos={}),
}
def test_execute_flux(dataplatform: DataPlateform):
meta = dataplatform.execute_flux("foo")
assert meta.data == {"who": "foo"}
assert dataplatform.repository("test").schema("test-bronze").tables == []
meta = dataplatform.execute_flux("raw_brz_copy_username")
assert meta.data == {"src_size": 283, "tgt_size": 283}
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]