import shutil from pathlib import Path import pytest from plesna.dataplatform import DataPlateform from plesna.models.graphs import Edge, EdgeOnSet, Node from plesna.models.flux import Flux, Transformation from plesna.storage.repository.fs_repository import FSRepository FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") @pytest.fixture def repository(tmp_path) -> FSRepository: example_src = FIXTURE_DIR assert example_src.exists() raw_path = Path(tmp_path) / "raw" shutil.copytree(src=example_src.absolute(), dst=raw_path.absolute()) bronze_path = Path(tmp_path) / "bronze" bronze_path.mkdir() silver_path = Path(tmp_path) / "silver" silver_path.mkdir() return FSRepository("test", "test", tmp_path) def test_add_repository( repository: FSRepository, ): dp = DataPlateform() dp.add_repository(repository) assert dp.repositories == ["test"] assert dp.repository("test") == repository @pytest.fixture def copy_flux(repository: FSRepository) -> Flux: raw_username = [repository.table("test-raw-username")] bronze_username = [repository.table("test-bronze-username")] def copy(sources, targets): src_path = Path(sources["test-raw-username"].datas[0]) tgt_path = Path(targets["test-bronze-username"].datas[0]) shutil.copy(src_path, tgt_path) return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size} extra_kwrds = {} raw_brz_copy_username = Flux( id="copy_flux", name="copy", sources=raw_username, targets=bronze_username, transformation=Transformation(function=copy, extra_kwrds=extra_kwrds), ) return raw_brz_copy_username @pytest.fixture def foo_flux(repository: FSRepository) -> Flux: src = [ repository.table("test-raw-username"), repository.table("test-raw-recovery"), ] targets = [repository.table("test-bronze-foo")] def foo(sources, targets): return {"who": "foo"} extra_kwrds = {} flux = Flux( id="foo_flux", name="foo", sources=src, targets=targets, transformation=Transformation(function=foo, extra_kwrds=extra_kwrds), ) return flux def test_add_flux(repository: FSRepository, copy_flux: Flux): dataplatform = DataPlateform() dataplatform.add_repository(repository) dataplatform.add_flux(name="copy_flux", flux=copy_flux) assert dataplatform.fluxes == ["copy_flux"] dataplatform.add_flux(name="copy_flux_bis", flux=copy_flux) assert dataplatform.fluxes == ["copy_flux", "copy_flux_bis"] assert dataplatform.flux("copy_flux") == copy_flux assert dataplatform.flux("copy_flux_bis") == copy_flux @pytest.fixture def dataplatform( repository: FSRepository, foo_flux: Flux, copy_flux: Flux, ) -> DataPlateform: dp = DataPlateform() dp.add_repository(repository) dp.add_flux("foo", foo_flux) dp.add_flux("raw_brz_copy_username", copy_flux) return dp def test_listing_content(dataplatform: DataPlateform): assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"] assert dataplatform.repository("test").schema("test-raw").tables == [ "test-raw-username", "test-raw-recovery", "test-raw-salary", ] assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"] assert dataplatform.repository("test").table("test-raw-recovery").partitions == [ "2022.csv", "2023.csv", "2024.csv", ] def test_content_from_graphset(dataplatform: DataPlateform): assert dataplatform.graphset().node_sets == { frozenset( { Node(name="test-bronze-username"), } ), frozenset( { Node(name="test-bronze-foo"), } ), frozenset( { Node(name="test-raw-username"), } ), frozenset( { Node(name="test-raw-username"), Node(name="test-raw-recovery"), } ), } assert dataplatform.graphset().edges == [ EdgeOnSet( arrow="foo_flux", sources=[Node(name="test-raw-username"), Node(name="test-raw-recovery")], targets=[Node(name="test-bronze-foo")], metadata={}, ), EdgeOnSet( arrow="copy_flux", sources=[Node(name="test-raw-username")], targets=[Node(name="test-bronze-username")], metadata={}, ), ] def test_content_from_graph(dataplatform: DataPlateform): assert dataplatform.graph().nodes == { Node(name="test-raw-recovery", infos={}), Node(name="test-raw-salary", infos={}), Node(name="test-raw-username", infos={}), Node(name="test-bronze-username", infos={}), Node(name="test-bronze-foo", infos={}), Node(name="test-raw-username", infos={}), } assert dataplatform.graph().edges == [ Edge( arrow="foo_flux", source=Node(name="test-raw-username"), target=Node(name="test-bronze-foo"), metadata={}, ), Edge( arrow="foo_flux", source=Node(name="test-raw-recovery"), target=Node(name="test-bronze-foo"), metadata={}, ), Edge( arrow="copy_flux", source=Node(name="test-raw-username"), target=Node(name="test-bronze-username"), metadata={}, ), ] def test_execute_flux(dataplatform: DataPlateform): meta = dataplatform.execute_flux("foo") assert meta.data == {"who": "foo"} assert dataplatform.repository("test").schema("test-bronze").tables == [] meta = dataplatform.execute_flux("raw_brz_copy_username") assert meta.data == {"src_size": 283, "tgt_size": 283} assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]