plesna/tests/dataplatform/test_dataplateform.py

138 lines
4.0 KiB
Python
Raw Normal View History

2025-01-03 15:01:01 +00:00
import shutil
2025-01-03 07:59:54 +00:00
from pathlib import Path
import pytest
from plesna.dataplatform import DataPlateform
2025-01-04 12:51:24 +00:00
from plesna.models.flux import Flux
from plesna.models.transformation import Transformation
2025-01-03 15:01:01 +00:00
from plesna.storage.repository.fs_repository import FSRepository
2025-01-03 07:59:54 +00:00
2025-01-03 15:01:01 +00:00
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
2025-01-03 07:59:54 +00:00
@pytest.fixture
2025-01-03 15:01:01 +00:00
def repository(tmp_path) -> FSRepository:
example_src = FIXTURE_DIR
assert example_src.exists()
2025-01-03 07:59:54 +00:00
raw_path = Path(tmp_path) / "raw"
shutil.copytree(src=example_src.absolute(), dst=raw_path.absolute())
2025-01-03 07:59:54 +00:00
2025-01-03 15:01:01 +00:00
bronze_path = Path(tmp_path) / "bronze"
bronze_path.mkdir()
2025-01-03 07:59:54 +00:00
silver_path = Path(tmp_path) / "silver"
silver_path.mkdir()
2025-01-03 15:01:01 +00:00
return FSRepository("test", tmp_path, "test")
2025-01-03 07:59:54 +00:00
2025-01-03 15:01:01 +00:00
def test_add_repository(
repository: FSRepository,
2025-01-03 07:59:54 +00:00
):
dp = DataPlateform()
2025-01-03 15:01:01 +00:00
dp.add_repository("test", repository)
assert dp.repositories == ["test"]
assert dp.repository("test") == repository
@pytest.fixture
2025-01-04 14:30:32 +00:00
def foo_flux(repository: FSRepository) -> Flux:
2025-01-05 13:34:16 +00:00
src = {"username": repository.table("test-raw-username")}
targets = {"username": repository.table("test-bronze-username")}
2025-01-03 15:01:01 +00:00
2025-01-04 14:30:32 +00:00
def foo(sources, targets):
return {"who": "foo"}
2025-01-03 15:01:01 +00:00
2025-01-04 14:30:32 +00:00
extra_kwrds = {}
flux = Flux(
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
)
return flux
2025-01-03 15:01:01 +00:00
2025-01-03 07:59:54 +00:00
2025-01-04 12:51:24 +00:00
@pytest.fixture
def copy_flux(repository: FSRepository) -> Flux:
2025-01-05 13:34:16 +00:00
raw_username = {"username": repository.table("test-raw-username")}
bronze_username = {"username": repository.table("test-bronze-username")}
2025-01-04 12:51:24 +00:00
def copy(sources, targets):
2025-01-04 20:33:05 +00:00
src_path = Path(sources["username"].datas[0])
tgt_path = Path(targets["username"].datas[0])
shutil.copy(src_path, tgt_path)
return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size}
2025-01-04 12:51:24 +00:00
extra_kwrds = {}
2025-01-04 20:33:05 +00:00
raw_brz_copy_username = Flux(
sources=raw_username,
targets=bronze_username,
2025-01-04 12:51:24 +00:00
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
)
2025-01-04 20:33:05 +00:00
return raw_brz_copy_username
2025-01-04 12:51:24 +00:00
2025-01-04 14:30:32 +00:00
def test_add_flux(repository: FSRepository, copy_flux: Flux):
dataplatform = DataPlateform()
dataplatform.add_repository("test", repository)
2025-01-04 12:51:24 +00:00
dataplatform.add_flux(name="copy_flux", flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"]
dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"]
assert dataplatform.flux("copy_flux") == copy_flux
assert dataplatform.flux("copy_flux_bis") == copy_flux
2025-01-04 14:30:32 +00:00
@pytest.fixture
def dataplatform(
repository: FSRepository,
foo_flux: Flux,
copy_flux: Flux,
) -> DataPlateform:
dp = DataPlateform()
dp.add_repository("test", repository)
dp.add_flux("foo", foo_flux)
2025-01-04 20:33:05 +00:00
dp.add_flux("raw_brz_copy_username", copy_flux)
2025-01-04 14:30:32 +00:00
return dp
def test_listing_content(dataplatform: DataPlateform):
2025-01-05 13:34:16 +00:00
assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"]
assert dataplatform.repository("test").schema("test-raw").tables == [
"test-raw-username",
"test-raw-recovery",
"test-raw-salary",
2025-01-04 14:30:32 +00:00
]
2025-01-05 13:34:16 +00:00
assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"]
assert dataplatform.repository("test").table("test-raw-recovery").partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
2025-01-04 14:30:32 +00:00
2025-01-05 13:34:16 +00:00
def test_content_from_graph(dataplatform: DataPlateform):
# assert dataplatform.graphset.model_dump() == {}
pass
2025-01-04 14:30:32 +00:00
def test_execute_flux(dataplatform: DataPlateform):
meta = dataplatform.execute_flux("foo")
assert meta.data == {"who": "foo"}
2025-01-04 20:33:05 +00:00
2025-01-05 13:34:16 +00:00
assert dataplatform.repository("test").schema("test-bronze").tables == []
2025-01-04 20:33:05 +00:00
meta = dataplatform.execute_flux("raw_brz_copy_username")
assert meta.data == {"src_size": 283, "tgt_size": 283}
2025-01-04 20:33:05 +00:00
2025-01-05 13:34:16 +00:00
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]