Compare commits
	
		
			9 Commits
		
	
	
		
			b9dade2701
			...
			f2ed76c8aa
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| f2ed76c8aa | |||
| 041e459ca0 | |||
| e4af62b136 | |||
| 9a5c581f31 | |||
| 09783f9c1e | |||
| 8a43a93cda | |||
| ae61fd3c12 | |||
| d256fbf169 | |||
| 48964ad561 | 
| @@ -3,6 +3,6 @@ from plesna.models.flux import Flux, FluxMetaData | ||||
|  | ||||
| def consume_flux(flux: Flux) -> FluxMetaData: | ||||
|     metadata = flux.transformation.function( | ||||
|         sources=flux.sources, targets=flux.targets, **flux.transformation.extra_kwrds | ||||
|         sources=flux.sources_dict, targets=flux.targets_dict, **flux.transformation.extra_kwrds | ||||
|     ) | ||||
|     return FluxMetaData(data=metadata) | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| from plesna.compute.consume_flux import consume_flux | ||||
| from plesna.graph.graph_set import GraphSet | ||||
| from plesna.graph.graph import Graph, Node | ||||
| from plesna.graph.graph_set import EdgeOnSet, GraphSet | ||||
| from plesna.models.flux import Flux, FluxMetaData | ||||
| from plesna.models.libs.flux_graph import flux_to_edgeonset | ||||
| from plesna.storage.repository.repository import Repository | ||||
|  | ||||
|  | ||||
| @@ -11,31 +13,52 @@ class DataPlateformError(Exception): | ||||
| class DataPlateform: | ||||
|     def __init__(self): | ||||
|         self._graphset = GraphSet() | ||||
|         self._graph = Graph() | ||||
|         self._metadata_engine = "" | ||||
|         self._fluxes = {} | ||||
|         self._repositories = {} | ||||
|  | ||||
|     def add_repository(self, name: str, repository: Repository) -> str: | ||||
|         if name in self._repositories: | ||||
|             raise DataPlateformError("The repository {name} already exists") | ||||
|     @property | ||||
|     def graphset(self) -> GraphSet: | ||||
|         return self._graphset | ||||
|  | ||||
|         self._repositories[name] = repository | ||||
|         return name | ||||
|     @property | ||||
|     def graph(self) -> Graph: | ||||
|         return self._graph | ||||
|  | ||||
|     def repository_graph_feed(self, repository_id: str): | ||||
|         for schema in self._repositories[repository_id].schemas(): | ||||
|             for table in self._repositories[repository_id].tables(schema): | ||||
|                 self._graph.add_node(Node(name=table)) | ||||
|  | ||||
|     def add_repository(self, repository: Repository) -> str: | ||||
|         if repository.id in self._repositories: | ||||
|             raise DataPlateformError("The repository {repository.id} already exists") | ||||
|  | ||||
|         self._repositories[repository.id] = repository | ||||
|         self.repository_graph_feed(repository.id) | ||||
|         return repository.id | ||||
|  | ||||
|     @property | ||||
|     def repositories(self) -> list[str]: | ||||
|         return list(self._repositories) | ||||
|  | ||||
|     def repository(self, name: str) -> Repository: | ||||
|         return self._repositories[name] | ||||
|     def repository(self, id: str) -> Repository: | ||||
|         return self._repositories[id] | ||||
|  | ||||
|     def add_flux(self, name: str, flux: Flux) -> str: | ||||
|         if name in self._fluxes: | ||||
|             raise DataPlateformError("The flux {name} already exists") | ||||
|  | ||||
|         self._fluxes[name] = flux | ||||
|         self.flux_graphset_feed(name) | ||||
|         return name | ||||
|  | ||||
|     def flux_graphset_feed(self, flux_name: str): | ||||
|         flux = self.flux(flux_name) | ||||
|         edge = flux_to_edgeonset(flux) | ||||
|         self._graphset.append(edge) | ||||
|  | ||||
|     @property | ||||
|     def fluxes(self) -> list[str]: | ||||
|         return list(self._fluxes) | ||||
|   | ||||
| @@ -1,21 +1,6 @@ | ||||
| from functools import reduce | ||||
|  | ||||
| from pydantic import BaseModel | ||||
|  | ||||
|  | ||||
| class Node(BaseModel): | ||||
|     name: str | ||||
|     infos: dict = {} | ||||
|  | ||||
|     def __hash__(self): | ||||
|         return hash(self.name) | ||||
|  | ||||
|  | ||||
| class Edge(BaseModel): | ||||
|     arrow_name: str | ||||
|     source: Node | ||||
|     target: Node | ||||
|     edge_kwrds: dict = {} | ||||
| from functools import reduce | ||||
| from plesna.models.graphs import Node, Edge | ||||
|  | ||||
|  | ||||
| class Graph: | ||||
|   | ||||
| @@ -1,20 +1,4 @@ | ||||
| from typing import Callable | ||||
|  | ||||
| from pydantic import BaseModel | ||||
|  | ||||
|  | ||||
| class Node(BaseModel): | ||||
|     name: str | ||||
|     infos: dict = {} | ||||
|  | ||||
|     def __hash__(self): | ||||
|         return hash(self.name) | ||||
|  | ||||
|  | ||||
| class EdgeOnSet(BaseModel): | ||||
|     arrow: str | ||||
|     sources: list[Node] | ||||
|     targets: list[Node] | ||||
| from plesna.models.graphs import EdgeOnSet | ||||
|  | ||||
|  | ||||
| class GraphSet: | ||||
|   | ||||
| @@ -1,14 +1,48 @@ | ||||
| from pydantic import BaseModel | ||||
| from collections.abc import Callable | ||||
| from pydantic import BaseModel, computed_field | ||||
|  | ||||
| from plesna.models.storage import Table | ||||
| from plesna.models.transformation import Transformation | ||||
|  | ||||
|  | ||||
| class Transformation(BaseModel): | ||||
|     """ | ||||
|     The function have to have at least 2 arguments: sources and targets | ||||
|     Other arguments will came throught extra_kwrds | ||||
|  | ||||
|     The function will have to return metadata as dict | ||||
|     """ | ||||
|  | ||||
|     function: Callable | ||||
|     extra_kwrds: dict = {} | ||||
|  | ||||
|  | ||||
| class Flux(BaseModel): | ||||
|     sources: dict[str, Table] | ||||
|     targets: dict[str, Table] | ||||
|     id: str | ||||
|     name: str | ||||
|     sources: list[Table] | ||||
|     targets: list[Table] | ||||
|     transformation: Transformation | ||||
|  | ||||
|     @computed_field | ||||
|     @property | ||||
|     def sources_dict(self) -> dict[str, Table]: | ||||
|         return {s.id: s for s in self.sources} | ||||
|  | ||||
|     @computed_field | ||||
|     @property | ||||
|     def sources_id(self) -> dict[str, Table]: | ||||
|         return [s.id for s in self.sources] | ||||
|  | ||||
|     @computed_field | ||||
|     @property | ||||
|     def targets_id(self) -> dict[str, Table]: | ||||
|         return [s.id for s in self.targets] | ||||
|  | ||||
|     @computed_field | ||||
|     @property | ||||
|     def targets_dict(self) -> dict[str, Table]: | ||||
|         return {s.id: s for s in self.targets} | ||||
|  | ||||
|  | ||||
| class FluxMetaData(BaseModel): | ||||
|     data: dict | ||||
|   | ||||
							
								
								
									
										21
									
								
								plesna/models/graphs.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								plesna/models/graphs.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,21 @@ | ||||
| from pydantic import BaseModel | ||||
|  | ||||
|  | ||||
| class Node(BaseModel): | ||||
|     name: str | ||||
|  | ||||
|     def __hash__(self): | ||||
|         return hash(self.name) | ||||
|  | ||||
|  | ||||
| class Edge(BaseModel): | ||||
|     arrow_name: str | ||||
|     source: Node | ||||
|     target: Node | ||||
|     edge_kwrds: dict = {} | ||||
|  | ||||
|  | ||||
| class EdgeOnSet(BaseModel): | ||||
|     arrow: str | ||||
|     sources: list[Node] | ||||
|     targets: list[Node] | ||||
| @@ -1,15 +0,0 @@ | ||||
| from collections.abc import Callable | ||||
|  | ||||
| from pydantic import BaseModel | ||||
|  | ||||
|  | ||||
| class Transformation(BaseModel): | ||||
|     """ | ||||
|     The function have to have at least 2 arguments: sources and targets | ||||
|     Other arguments will came throught extra_kwrds | ||||
|  | ||||
|     The function will have to return metadata as dict | ||||
|     """ | ||||
|  | ||||
|     function: Callable | ||||
|     extra_kwrds: dict = {} | ||||
| @@ -2,29 +2,16 @@ from pathlib import Path | ||||
|  | ||||
| from pydantic import BaseModel, computed_field | ||||
|  | ||||
| from plesna.libs.string_tools import extract_values_from_pattern | ||||
| from plesna.models.storage import Partition, Schema, Table | ||||
| from plesna.storage.repository.repository import Repository | ||||
|  | ||||
|  | ||||
| class FSPartition(BaseModel): | ||||
|     name: str | ||||
|     path: Path | ||||
|  | ||||
|     @computed_field | ||||
|     @property | ||||
|     def ref(self) -> Partition: | ||||
|         return Partition( | ||||
|             id=str(self.path), | ||||
|             repo_id=str(self.path.parent.parent.parent), | ||||
|             schema_id=str(self.path.parent.parent), | ||||
|             table_id=str(self.path.parent), | ||||
|             name=self.name, | ||||
|             value=str(self.path.absolute()), | ||||
|         ) | ||||
|  | ||||
|  | ||||
| class FSTable(BaseModel): | ||||
|     name: str | ||||
|     repo_id: str | ||||
|     schema_id: str | ||||
|     id: str | ||||
|     path: Path | ||||
|     is_partitionned: bool | ||||
|     partitions: list[str] = [] | ||||
| @@ -38,9 +25,9 @@ class FSTable(BaseModel): | ||||
|             datas = [str(self.path.absolute())] | ||||
|  | ||||
|         return Table( | ||||
|             id=str(self.path), | ||||
|             repo_id=str(self.path.parent.parent), | ||||
|             schema_id=str(self.path.parent), | ||||
|             id=self.id, | ||||
|             repo_id=self.repo_id, | ||||
|             schema_id=self.schema_id, | ||||
|             name=self.name, | ||||
|             value=str(self.path.absolute()), | ||||
|             partitions=self.partitions, | ||||
| @@ -50,6 +37,8 @@ class FSTable(BaseModel): | ||||
|  | ||||
| class FSSchema(BaseModel): | ||||
|     name: str | ||||
|     repo_id: str | ||||
|     id: str | ||||
|     path: Path | ||||
|     tables: list[str] | ||||
|  | ||||
| @@ -57,14 +46,18 @@ class FSSchema(BaseModel): | ||||
|     @property | ||||
|     def ref(self) -> Schema: | ||||
|         return Schema( | ||||
|             id=str(self.path), | ||||
|             repo_id=str(self.path.parent), | ||||
|             id=self.id, | ||||
|             repo_id=self.repo_id, | ||||
|             name=self.name, | ||||
|             value=str(self.path.absolute()), | ||||
|             tables=self.tables, | ||||
|         ) | ||||
|  | ||||
|  | ||||
| class FSRepositoryError(ValueError): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| class FSRepository(Repository): | ||||
|     """Repository based on files tree structure | ||||
|  | ||||
| @@ -74,11 +67,15 @@ class FSRepository(Repository): | ||||
|  | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, name: str, basepath: str, id: str): | ||||
|         self._basepath = Path(basepath) | ||||
|         self.name = name | ||||
|         self.id = id | ||||
|     ID_FMT = { | ||||
|         "schema": "{repo_id}-{schema_name}", | ||||
|         "table": "{schema_id}-{table_name}", | ||||
|     } | ||||
|  | ||||
|     def __init__(self, id: str, name: str, basepath: str): | ||||
|         super().__init__(id, name) | ||||
|  | ||||
|         self._basepath = Path(basepath) | ||||
|         assert self._basepath.exists() | ||||
|  | ||||
|     def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]: | ||||
| @@ -112,47 +109,89 @@ class FSRepository(Repository): | ||||
|  | ||||
|         return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")] | ||||
|  | ||||
|     def parse_id(self, string: str, id_type: str) -> dict: | ||||
|         if id_type not in self.ID_FMT: | ||||
|             raise FSRepositoryError( | ||||
|                 "Wrong id_type. Gots {id_type} needs to be one of {self.ID_FMT.values}" | ||||
|             ) | ||||
|         parsed = extract_values_from_pattern(self.ID_FMT[id_type], string) | ||||
|         if not parsed: | ||||
|             raise FSRepositoryError( | ||||
|                 f"Wrong format for {id_type}. Got {string} need {self.ID_FMT['id_type']}" | ||||
|             ) | ||||
|         return parsed | ||||
|  | ||||
|     def schemas(self) -> list[str]: | ||||
|         """List schemas (sub directories within basepath)""" | ||||
|         subdirectories = self.ls("", only_directories=True) | ||||
|         return [str(d) for d in subdirectories] | ||||
|         return [ | ||||
|             self.ID_FMT["schema"].format(repo_id=self.id, schema_name=d) for d in subdirectories | ||||
|         ] | ||||
|  | ||||
|     def _schema(self, name: str) -> FSSchema: | ||||
|     def _schema(self, schema_id: str) -> FSSchema: | ||||
|         """List schemas (sub directories within basepath)""" | ||||
|         schema_path = self._basepath / name | ||||
|         tables = self.tables(schema=name) | ||||
|         return FSSchema(name=name, path=schema_path, tables=tables) | ||||
|         parsed = self.parse_id(schema_id, "schema") | ||||
|  | ||||
|     def schema(self, name: str) -> Schema: | ||||
|         return self._schema(name).ref | ||||
|         repo_id = parsed["repo_id"] | ||||
|         schema_name = parsed["schema_name"] | ||||
|         schema_path = self._basepath / schema_name | ||||
|  | ||||
|     def tables(self, schema: str) -> list[str]: | ||||
|         tables = self.ls(schema) | ||||
|         if repo_id != self.id: | ||||
|             raise FSRepositoryError("Trying to get schema that don't belong in this repository") | ||||
|  | ||||
|         tables = self.tables(schema_id) | ||||
|         return FSSchema( | ||||
|             name=schema_name, | ||||
|             id=schema_id, | ||||
|             repo_id=self.id, | ||||
|             schema_id=schema_id, | ||||
|             path=schema_path, | ||||
|             tables=tables, | ||||
|         ) | ||||
|  | ||||
|     def schema(self, schema_id: str) -> Schema: | ||||
|         return self._schema(schema_id).ref | ||||
|  | ||||
|     def _tables(self, schema_id: str) -> list[str]: | ||||
|         parsed = self.parse_id(schema_id, "schema") | ||||
|         tables = self.ls(parsed["schema_name"]) | ||||
|         return [self.ID_FMT["table"].format(table_name=t, schema_id=schema_id) for t in tables] | ||||
|  | ||||
|     def tables(self, schema_id: str = "") -> list[str]: | ||||
|         if schema_id: | ||||
|             return self._tables(schema_id) | ||||
|  | ||||
|         tables = [] | ||||
|         for schema in self.schemas(): | ||||
|             tables += self._tables(schema) | ||||
|         return tables | ||||
|  | ||||
|     def _table(self, schema: str, name: str) -> FSTable: | ||||
|     def _table(self, table_id: str) -> FSTable: | ||||
|         """Get infos on the table""" | ||||
|         table_path = self._basepath / schema / name | ||||
|         parsed = self.parse_id(table_id, "table") | ||||
|         schema = self._schema(parsed["schema_id"]) | ||||
|  | ||||
|         if not schema.path.exists(): | ||||
|             raise FSRepositoryError(f"The schema {schema.id} does not exists.") | ||||
|  | ||||
|         table_subpath = f"{schema.name}/{parsed['table_name']}" | ||||
|         table_path = self._basepath / table_subpath | ||||
|  | ||||
|         is_partitionned = table_path.is_dir() | ||||
|         if is_partitionned: | ||||
|             partitions = self.ls(f"{schema}/{name}", only_files=True) | ||||
|             partitions = self.ls(table_subpath, only_files=True) | ||||
|         else: | ||||
|             partitions = [] | ||||
|  | ||||
|         return FSTable( | ||||
|             name=name, | ||||
|             name=parsed["table_name"], | ||||
|             id=table_id, | ||||
|             repo_id=self.id, | ||||
|             schema_id=schema.id, | ||||
|             path=table_path, | ||||
|             is_partitionned=is_partitionned, | ||||
|             partitions=partitions, | ||||
|         ) | ||||
|  | ||||
|     def table(self, schema: str, name: str) -> Table: | ||||
|         return self._table(schema, name).ref | ||||
|  | ||||
|     def _partition(self, schema: str, table: str, partition: str) -> FSPartition: | ||||
|         """Get infos on the partition""" | ||||
|         table_path = self._basepath / schema / table | ||||
|         return FSPartition(name=partition, table_path=table_path) | ||||
|  | ||||
|     def partition(self, schema: str, name: str) -> Partition: | ||||
|         return self._partition(schema, name).ref | ||||
|     def table(self, table_id: str) -> Table: | ||||
|         return self._table(table_id).ref | ||||
|   | ||||
| @@ -4,35 +4,34 @@ from plesna.models.storage import Partition, Schema, Table | ||||
|  | ||||
|  | ||||
| class Repository: | ||||
|     def __init__(self): | ||||
|         pass | ||||
|     def __init__(self, id: str, name: str): | ||||
|         self._id = id | ||||
|         self._name = name | ||||
|  | ||||
|     @property | ||||
|     def id(self) -> str: | ||||
|         return self._id | ||||
|  | ||||
|     @property | ||||
|     def name(self) -> str: | ||||
|         return self._name | ||||
|  | ||||
|     @abc.abstractmethod | ||||
|     def schemas(self) -> list[str]: | ||||
|         """List schema's names""" | ||||
|         """List schema's ids""" | ||||
|         raise NotImplementedError | ||||
|  | ||||
|     @abc.abstractmethod | ||||
|     def schema(self, name: str) -> Schema: | ||||
|     def schema(self, schema_id: str) -> Schema: | ||||
|         """Get the schema properties""" | ||||
|         raise NotImplementedError | ||||
|  | ||||
|     @abc.abstractmethod | ||||
|     def tables(self, schema: str) -> list[str]: | ||||
|         """List table's name in schema""" | ||||
|     def tables(self, schema_id: str) -> list[str]: | ||||
|         """List table's name in schema (the id)""" | ||||
|         raise NotImplementedError | ||||
|  | ||||
|     @abc.abstractmethod | ||||
|     def table(self, schema: str, name: str) -> Table: | ||||
|         """Get the table properties""" | ||||
|         raise NotImplementedError | ||||
|  | ||||
|     @abc.abstractmethod | ||||
|     def partitions(self, schema: str, table: str) -> list[str]: | ||||
|         """List partition's name in table""" | ||||
|         raise NotImplementedError | ||||
|  | ||||
|     @abc.abstractmethod | ||||
|     def partition(self, schema: str, name: str, partition: str) -> Partition: | ||||
|         """Get the partition properties""" | ||||
|     def table(self, table_id: str) -> Table: | ||||
|         """Get the table properties (the id)""" | ||||
|         raise NotImplementedError | ||||
|   | ||||
| @@ -1,26 +1,17 @@ | ||||
| from plesna.compute.consume_flux import consume_flux | ||||
| from plesna.models.flux import Flux | ||||
| from plesna.models.flux import Flux, Transformation | ||||
| from plesna.models.storage import Table | ||||
| from plesna.models.transformation import Transformation | ||||
|  | ||||
|  | ||||
| def test_consume_flux(): | ||||
|     sources = { | ||||
|         "src1": Table( | ||||
|             id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"] | ||||
|         ), | ||||
|         "src2": Table( | ||||
|             id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"] | ||||
|         ), | ||||
|     } | ||||
|     targets = { | ||||
|         "tgt1": Table( | ||||
|             id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"] | ||||
|         ), | ||||
|         "tgt2": Table( | ||||
|             id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"] | ||||
|         ), | ||||
|     } | ||||
|     sources = [ | ||||
|         Table(id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]), | ||||
|         Table(id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]), | ||||
|     ] | ||||
|     targets = [ | ||||
|         Table(id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]), | ||||
|         Table(id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]), | ||||
|     ] | ||||
|  | ||||
|     def func(sources, targets, **kwrds): | ||||
|         return { | ||||
| @@ -30,6 +21,8 @@ def test_consume_flux(): | ||||
|         } | ||||
|  | ||||
|     flux = Flux( | ||||
|         id="flux", | ||||
|         name="flux", | ||||
|         sources=sources, | ||||
|         targets=targets, | ||||
|         transformation=Transformation(function=func, extra_kwrds={"extra": "super"}), | ||||
|   | ||||
| @@ -4,8 +4,8 @@ from pathlib import Path | ||||
| import pytest | ||||
|  | ||||
| from plesna.dataplatform import DataPlateform | ||||
| from plesna.models.flux import Flux | ||||
| from plesna.models.transformation import Transformation | ||||
| from plesna.models.graphs import Node | ||||
| from plesna.models.flux import Flux, Transformation | ||||
| from plesna.storage.repository.fs_repository import FSRepository | ||||
|  | ||||
| FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") | ||||
| @@ -24,52 +24,36 @@ def repository(tmp_path) -> FSRepository: | ||||
|     silver_path = Path(tmp_path) / "silver" | ||||
|     silver_path.mkdir() | ||||
|  | ||||
|     return FSRepository("test", tmp_path, "test") | ||||
|     return FSRepository("test", "test", tmp_path) | ||||
|  | ||||
|  | ||||
| def test_add_repository( | ||||
|     repository: FSRepository, | ||||
| ): | ||||
|     dp = DataPlateform() | ||||
|     dp.add_repository("test", repository) | ||||
|     dp.add_repository(repository) | ||||
|  | ||||
|     assert dp.repositories == ["test"] | ||||
|  | ||||
|     assert dp.repository("test") == repository | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def foo_flux(repository: FSRepository) -> Flux: | ||||
|     src = {"username": repository.table("raw", "username")} | ||||
|     targets = {"username": repository.table("bronze", "username")} | ||||
|  | ||||
|     def foo(sources, targets): | ||||
|         return {"who": "foo"} | ||||
|  | ||||
|     extra_kwrds = {} | ||||
|  | ||||
|     flux = Flux( | ||||
|         sources=src, | ||||
|         targets=targets, | ||||
|         transformation=Transformation(function=foo, extra_kwrds=extra_kwrds), | ||||
|     ) | ||||
|     return flux | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def copy_flux(repository: FSRepository) -> Flux: | ||||
|     raw_username = {"username": repository.table("raw", "username")} | ||||
|     bronze_username = {"username": repository.table("bronze", "username")} | ||||
|     raw_username = [repository.table("test-raw-username")] | ||||
|     bronze_username = [repository.table("test-bronze-username")] | ||||
|  | ||||
|     def copy(sources, targets): | ||||
|         src_path = Path(sources["username"].datas[0]) | ||||
|         tgt_path = Path(targets["username"].datas[0]) | ||||
|         src_path = Path(sources["test-raw-username"].datas[0]) | ||||
|         tgt_path = Path(targets["test-bronze-username"].datas[0]) | ||||
|         shutil.copy(src_path, tgt_path) | ||||
|         return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size} | ||||
|  | ||||
|     extra_kwrds = {} | ||||
|  | ||||
|     raw_brz_copy_username = Flux( | ||||
|         id="copy_flux", | ||||
|         name="copy", | ||||
|         sources=raw_username, | ||||
|         targets=bronze_username, | ||||
|         transformation=Transformation(function=copy, extra_kwrds=extra_kwrds), | ||||
| @@ -77,9 +61,32 @@ def copy_flux(repository: FSRepository) -> Flux: | ||||
|     return raw_brz_copy_username | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def foo_flux(repository: FSRepository) -> Flux: | ||||
|     src = [ | ||||
|         repository.table("test-raw-username"), | ||||
|         repository.table("test-raw-recovery"), | ||||
|     ] | ||||
|     targets = [repository.table("test-bronze-foo")] | ||||
|  | ||||
|     def foo(sources, targets): | ||||
|         return {"who": "foo"} | ||||
|  | ||||
|     extra_kwrds = {} | ||||
|  | ||||
|     flux = Flux( | ||||
|         id="foo_flux", | ||||
|         name="foo", | ||||
|         sources=src, | ||||
|         targets=targets, | ||||
|         transformation=Transformation(function=foo, extra_kwrds=extra_kwrds), | ||||
|     ) | ||||
|     return flux | ||||
|  | ||||
|  | ||||
| def test_add_flux(repository: FSRepository, copy_flux: Flux): | ||||
|     dataplatform = DataPlateform() | ||||
|     dataplatform.add_repository("test", repository) | ||||
|     dataplatform.add_repository(repository) | ||||
|  | ||||
|     dataplatform.add_flux(name="copy_flux", flux=copy_flux) | ||||
|     assert dataplatform.fluxes == ["copy_flux"] | ||||
| @@ -98,7 +105,7 @@ def dataplatform( | ||||
| ) -> DataPlateform: | ||||
|     dp = DataPlateform() | ||||
|  | ||||
|     dp.add_repository("test", repository) | ||||
|     dp.add_repository(repository) | ||||
|  | ||||
|     dp.add_flux("foo", foo_flux) | ||||
|     dp.add_flux("raw_brz_copy_username", copy_flux) | ||||
| @@ -106,27 +113,59 @@ def dataplatform( | ||||
|  | ||||
|  | ||||
| def test_listing_content(dataplatform: DataPlateform): | ||||
|     assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"] | ||||
|     assert dataplatform.repository("test").schema("raw").tables == [ | ||||
|         "username", | ||||
|         "recovery", | ||||
|         "salary", | ||||
|     assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"] | ||||
|     assert dataplatform.repository("test").schema("test-raw").tables == [ | ||||
|         "test-raw-username", | ||||
|         "test-raw-recovery", | ||||
|         "test-raw-salary", | ||||
|     ] | ||||
|     assert dataplatform.repository("test").table("raw", "username").partitions == ["username.csv"] | ||||
|     assert dataplatform.repository("test").table("raw", "recovery").partitions == [ | ||||
|     assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"] | ||||
|     assert dataplatform.repository("test").table("test-raw-recovery").partitions == [ | ||||
|         "2022.csv", | ||||
|         "2023.csv", | ||||
|         "2024.csv", | ||||
|     ] | ||||
|  | ||||
|  | ||||
| def test_content_from_graph(dataplatform: DataPlateform): | ||||
|     assert dataplatform.graph.nodes == { | ||||
|         Node(name="test-raw-recovery", infos={}), | ||||
|         Node(name="test-raw-salary", infos={}), | ||||
|         Node(name="test-raw-username", infos={}), | ||||
|     } | ||||
|  | ||||
|     assert dataplatform.graphset.node_sets == { | ||||
|         frozenset( | ||||
|             { | ||||
|                 Node(name="test-bronze-username"), | ||||
|             } | ||||
|         ), | ||||
|         frozenset( | ||||
|             { | ||||
|                 Node(name="test-bronze-foo"), | ||||
|             } | ||||
|         ), | ||||
|         frozenset( | ||||
|             { | ||||
|                 Node(name="test-raw-username"), | ||||
|             } | ||||
|         ), | ||||
|         frozenset( | ||||
|             { | ||||
|                 Node(name="test-raw-username"), | ||||
|                 Node(name="test-raw-recovery"), | ||||
|             } | ||||
|         ), | ||||
|     } | ||||
|  | ||||
|  | ||||
| def test_execute_flux(dataplatform: DataPlateform): | ||||
|     meta = dataplatform.execute_flux("foo") | ||||
|     assert meta.data == {"who": "foo"} | ||||
|  | ||||
|     assert dataplatform.repository("test").schema("bronze").tables == [] | ||||
|     assert dataplatform.repository("test").schema("test-bronze").tables == [] | ||||
|  | ||||
|     meta = dataplatform.execute_flux("raw_brz_copy_username") | ||||
|     assert meta.data == {"src_size": 283, "tgt_size": 283} | ||||
|  | ||||
|     assert dataplatform.repository("test").schema("bronze").tables == ["username"] | ||||
|     assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"] | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| import pytest | ||||
|  | ||||
| from plesna.graph.graph import Edge, Graph, Node | ||||
| from plesna.graph.graph import Graph | ||||
| from plesna.models.graphs import Edge, Node | ||||
|  | ||||
|  | ||||
| def test_append_nodess(): | ||||
| @@ -94,9 +95,7 @@ def test_get_sources_from(nodes, dag_edges): | ||||
|     assert graph.get_direct_sources_from(nodes["C"]) == set([nodes["A"], nodes["B"]]) | ||||
|     assert graph.get_direct_sources_from(nodes["D"]) == set([nodes["C"]]) | ||||
|  | ||||
|     assert graph.get_sources_from(nodes["D"]) == set( | ||||
|         [nodes["A"], nodes["B"], nodes["C"]] | ||||
|     ) | ||||
|     assert graph.get_sources_from(nodes["D"]) == set([nodes["A"], nodes["B"], nodes["C"]]) | ||||
|  | ||||
|  | ||||
| def test_valid_dage(dag_edges, notdag_edges): | ||||
|   | ||||
| @@ -1,4 +1,5 @@ | ||||
| from plesna.graph.graph_set import EdgeOnSet, GraphSet, Node | ||||
| from plesna.graph.graph_set import GraphSet | ||||
| from plesna.models.graphs import EdgeOnSet, Node | ||||
|  | ||||
|  | ||||
| def test_init(): | ||||
|   | ||||
| @@ -20,7 +20,7 @@ def location(tmp_path): | ||||
|  | ||||
|  | ||||
| def test_init(location): | ||||
|     repo = FSRepository("example", location, "example") | ||||
|     repo = FSRepository("example", "example", location) | ||||
|     assert repo.ls() == [ | ||||
|         "schema", | ||||
|     ] | ||||
| @@ -45,29 +45,50 @@ def test_init(location): | ||||
|  | ||||
| @pytest.fixture | ||||
| def repository(location) -> FSRepository: | ||||
|     return FSRepository("example", location, "example") | ||||
|     return FSRepository("repo_id", "example", location) | ||||
|  | ||||
|  | ||||
| def test_list_schema(location, repository): | ||||
|     assert repository.schemas() == ["schema"] | ||||
|     assert repository.schema("schema").name == "schema" | ||||
|     assert repository.schema("schema").id == str(location / "schema") | ||||
|     assert repository.schema("schema").repo_id == str(location) | ||||
|     assert repository.schema("schema").value == str(location / "schema") | ||||
|     assert repository.schema("schema").tables == ["username", "recovery", "salary"] | ||||
| def test_list_schemas(repository): | ||||
|     assert repository.schemas() == ["repo_id-schema"] | ||||
|  | ||||
|  | ||||
| def test_describe_schema(location, repository): | ||||
|     schema = repository.schema("repo_id-schema") | ||||
|     assert schema.name == "schema" | ||||
|     assert schema.id == "repo_id-schema" | ||||
|     assert schema.repo_id == "repo_id" | ||||
|     assert schema.value == str(location / "schema") | ||||
|     assert schema.tables == [ | ||||
|         "repo_id-schema-username", | ||||
|         "repo_id-schema-recovery", | ||||
|         "repo_id-schema-salary", | ||||
|     ] | ||||
|  | ||||
|  | ||||
| def test_list_tables_schema(repository): | ||||
|     assert repository.schema("schema").tables == ["username", "recovery", "salary"] | ||||
|     assert repository.tables(schema="schema") == ["username", "recovery", "salary"] | ||||
|     assert repository.schema("repo_id-schema").tables == [ | ||||
|         "repo_id-schema-username", | ||||
|         "repo_id-schema-recovery", | ||||
|         "repo_id-schema-salary", | ||||
|     ] | ||||
|     assert repository.tables("repo_id-schema") == [ | ||||
|         "repo_id-schema-username", | ||||
|         "repo_id-schema-recovery", | ||||
|         "repo_id-schema-salary", | ||||
|     ] | ||||
|     assert repository.tables() == [ | ||||
|         "repo_id-schema-username", | ||||
|         "repo_id-schema-recovery", | ||||
|         "repo_id-schema-salary", | ||||
|     ] | ||||
|  | ||||
|  | ||||
| def test_describe_table(location, repository): | ||||
|     table = repository.table("schema", "username") | ||||
|     table = repository.table("repo_id-schema-username") | ||||
|  | ||||
|     assert table.id == str(location / "schema" / "username") | ||||
|     assert table.repo_id == str(location) | ||||
|     assert table.schema_id == str(location / "schema") | ||||
|     assert table.id == "repo_id-schema-username" | ||||
|     assert table.repo_id == "repo_id" | ||||
|     assert table.schema_id == "repo_id-schema" | ||||
|     assert table.name == "username" | ||||
|     assert table.value == str(location / "schema" / "username") | ||||
|     assert table.partitions == ["username.csv"] | ||||
| @@ -75,11 +96,11 @@ def test_describe_table(location, repository): | ||||
|  | ||||
|  | ||||
| def test_describe_table_with_partitions(location, repository): | ||||
|     table = repository.table("schema", "recovery") | ||||
|     table = repository.table("repo_id-schema-recovery") | ||||
|  | ||||
|     assert table.id == str(location / "schema" / "recovery") | ||||
|     assert table.repo_id == str(location) | ||||
|     assert table.schema_id == str(location / "schema") | ||||
|     assert table.id == "repo_id-schema-recovery" | ||||
|     assert table.repo_id == "repo_id" | ||||
|     assert table.schema_id == "repo_id-schema" | ||||
|     assert table.name == "recovery" | ||||
|     assert table.value == str(location / "schema" / "recovery") | ||||
|     assert table.partitions == [ | ||||
|   | ||||
		Reference in New Issue
	
	Block a user