import shutil from pathlib import Path import pytest from plesna.dataplatform import DataPlateform from plesna.models.flux import Flux from plesna.models.transformation import Transformation from plesna.storage.repository.fs_repository import FSRepository FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") @pytest.fixture def repository(tmp_path) -> FSRepository: raw_path = Path(tmp_path) / "raw" raw_path.mkdir() example_src = FIXTURE_DIR assert example_src.exists() recovery_loc = raw_path / "recovery" recovery_loc.mkdir() username_loc = raw_path / "username" username_loc.mkdir() salary_loc = raw_path / "salary" salary_loc.mkdir() for f in example_src.glob("*"): if "recovery" in str(f): shutil.copy(f, recovery_loc) elif "salary" in str(f): shutil.copy(f, salary_loc) else: shutil.copy(f, username_loc) bronze_path = Path(tmp_path) / "bronze" bronze_path.mkdir() silver_path = Path(tmp_path) / "silver" silver_path.mkdir() return FSRepository("test", tmp_path, "test") def test_add_repository( repository: FSRepository, ): dp = DataPlateform() dp.add_repository("test", repository) assert dp.repositories == ["test"] assert dp.repository("test") == repository @pytest.fixture def foo_flux(repository: FSRepository) -> Flux: src = {"username": repository.table("raw", "username")} targets = {"username": repository.table("bronze", "username")} def foo(sources, targets): return {"who": "foo"} extra_kwrds = {} flux = Flux( sources=src, targets=targets, transformation=Transformation(function=foo, extra_kwrds=extra_kwrds), ) return flux @pytest.fixture def copy_flux(repository: FSRepository) -> Flux: raw_username = {"username": repository.table("raw", "username")} bronze_username = {"username": repository.table("bronze", "username")} def copy(sources, targets): src_path = Path(sources["username"].datas[0]) tgt_path = Path(targets["username"].datas[0]) shutil.copy(src_path, tgt_path) return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size} extra_kwrds = {} raw_brz_copy_username = Flux( sources=raw_username, targets=bronze_username, transformation=Transformation(function=copy, extra_kwrds=extra_kwrds), ) return raw_brz_copy_username def test_add_flux(repository: FSRepository, copy_flux: Flux): dataplatform = DataPlateform() dataplatform.add_repository("test", repository) dataplatform.add_flux(name="copy_flux", flux=copy_flux) assert dataplatform.fluxes == ["copy_flux"] dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux) assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"] assert dataplatform.flux("copy_flux") == copy_flux assert dataplatform.flux("copy_flux_bis") == copy_flux @pytest.fixture def dataplatform( repository: FSRepository, foo_flux: Flux, copy_flux: Flux, ) -> DataPlateform: dp = DataPlateform() dp.add_repository("test", repository) dp.add_flux("foo", foo_flux) dp.add_flux("raw_brz_copy_username", copy_flux) return dp def test_listing_content(dataplatform: DataPlateform): assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"] assert dataplatform.repository("test").schema("raw").tables == [ "recovery", "username", "salary", ] assert dataplatform.repository("test").table("raw", "username").partitions == ["username.csv"] def test_execute_flux(dataplatform: DataPlateform): meta = dataplatform.execute_flux("foo") assert meta.data == {"who": "foo"} assert dataplatform.repository("test").schema("bronze").tables == [] meta = dataplatform.execute_flux("raw_brz_copy_username") assert meta.data == {"src_size": 175, "tgt_size": 175} assert dataplatform.repository("test").schema("bronze").tables == ["username"]