import shutil from pathlib import Path import pytest from plesna.dataplatform import DataPlateform from plesna.models.flux import Flux from plesna.models.transformation import Transformation from plesna.storage.repository.fs_repository import FSRepository FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") @pytest.fixture def repository(tmp_path) -> FSRepository: raw_path = Path(tmp_path) / "raw" raw_path.mkdir() example_src = FIXTURE_DIR assert example_src.exists() recovery_loc = raw_path / "recovery" recovery_loc.mkdir() username_loc = raw_path / "username" username_loc.mkdir() salary_loc = raw_path / "salary" salary_loc.mkdir() for f in example_src.glob("*"): if "recovery" in str(f): shutil.copy(f, recovery_loc) if "salary" in str(f): shutil.copy(f, salary_loc) else: shutil.copy(f, username_loc) bronze_path = Path(tmp_path) / "bronze" bronze_path.mkdir() silver_path = Path(tmp_path) / "silver" silver_path.mkdir() return FSRepository("test", tmp_path, "test") def test_add_repository( repository: FSRepository, ): dp = DataPlateform() dp.add_repository("test", repository) assert dp.repositories == ["test"] assert dp.repository("test") == repository @pytest.fixture def dataplatform( repository: FSRepository, ) -> DataPlateform: dp = DataPlateform() dp.add_repository("test", repository) return dp def test_listing_content(dataplatform: DataPlateform): assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"] assert dataplatform.repository("test").schema("raw").tables == [ "recovery", "username", "salary", ] @pytest.fixture def copy_flux(repository: FSRepository) -> Flux: src = {"username": repository.table("raw", "username")} targets = {"username": repository.table("bronze", "username")} def copy(sources, targets): pass extra_kwrds = {} flux = Flux( sources=src, targets=targets, transformation=Transformation(function=copy, extra_kwrds=extra_kwrds), ) return flux def test_add_flux(dataplatform: DataPlateform, copy_flux: Flux): dataplatform.add_flux(name="copy_flux", flux=copy_flux) assert dataplatform.fluxes == ["copy_flux"] dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux) assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"] assert dataplatform.flux("copy_flux") == copy_flux assert dataplatform.flux("copy_flux_bis") == copy_flux