Compare commits
4 Commits
86f0dcc49e
...
b9dade2701
Author | SHA1 | Date | |
---|---|---|---|
b9dade2701 | |||
ed8f91d78b | |||
d1c1b7420d | |||
f0315d09b9 |
@ -1,5 +1,4 @@
|
||||
from functools import reduce
|
||||
from typing import Callable
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
@ -12,10 +12,9 @@ class Node(BaseModel):
|
||||
|
||||
|
||||
class EdgeOnSet(BaseModel):
|
||||
arrow: Callable
|
||||
sources: dict[str, Node]
|
||||
targets: dict[str, Node]
|
||||
edge_kwrds: dict = {}
|
||||
arrow: str
|
||||
sources: list[Node]
|
||||
targets: list[Node]
|
||||
|
||||
|
||||
class GraphSet:
|
||||
@ -25,8 +24,8 @@ class GraphSet:
|
||||
|
||||
def append(self, edge: EdgeOnSet):
|
||||
self._edges.append(edge)
|
||||
self._node_sets.add(frozenset(edge.sources.values()))
|
||||
self._node_sets.add(frozenset(edge.targets.values()))
|
||||
self._node_sets.add(frozenset(edge.sources))
|
||||
self._node_sets.add(frozenset(edge.targets))
|
||||
|
||||
@property
|
||||
def node_sets(self):
|
||||
|
0
plesna/libs/__init__.py
Normal file
0
plesna/libs/__init__.py
Normal file
18
plesna/libs/string_tools.py
Normal file
18
plesna/libs/string_tools.py
Normal file
@ -0,0 +1,18 @@
|
||||
import re
|
||||
|
||||
|
||||
class StringToolsError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
def extract_values_from_pattern(pattern, string):
|
||||
regex = re.sub(r"{(.+?)}", r"(?P<_\1>.+)", pattern)
|
||||
|
||||
search = re.search(regex, string)
|
||||
if search:
|
||||
values = list(search.groups())
|
||||
keys = re.findall(r"{(.+?)}", pattern)
|
||||
_dict = dict(zip(keys, values))
|
||||
return _dict
|
||||
|
||||
raise StringToolsError(f"Can't parse '{string}' with the pattern '{pattern}'")
|
@ -120,12 +120,16 @@ class FSRepository(Repository):
|
||||
def _schema(self, name: str) -> FSSchema:
|
||||
"""List schemas (sub directories within basepath)"""
|
||||
schema_path = self._basepath / name
|
||||
tables = self.ls(name)
|
||||
tables = self.tables(schema=name)
|
||||
return FSSchema(name=name, path=schema_path, tables=tables)
|
||||
|
||||
def schema(self, name: str) -> Schema:
|
||||
return self._schema(name).ref
|
||||
|
||||
def tables(self, schema: str) -> list[str]:
|
||||
tables = self.ls(schema)
|
||||
return tables
|
||||
|
||||
def _table(self, schema: str, name: str) -> FSTable:
|
||||
"""Get infos on the table"""
|
||||
table_path = self._basepath / schema / name
|
||||
|
@ -13,26 +13,11 @@ FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
|
||||
|
||||
@pytest.fixture
|
||||
def repository(tmp_path) -> FSRepository:
|
||||
raw_path = Path(tmp_path) / "raw"
|
||||
raw_path.mkdir()
|
||||
|
||||
example_src = FIXTURE_DIR
|
||||
assert example_src.exists()
|
||||
|
||||
recovery_loc = raw_path / "recovery"
|
||||
recovery_loc.mkdir()
|
||||
username_loc = raw_path / "username"
|
||||
username_loc.mkdir()
|
||||
salary_loc = raw_path / "salary"
|
||||
salary_loc.mkdir()
|
||||
|
||||
for f in example_src.glob("*"):
|
||||
if "recovery" in str(f):
|
||||
shutil.copy(f, recovery_loc)
|
||||
elif "salary" in str(f):
|
||||
shutil.copy(f, salary_loc)
|
||||
else:
|
||||
shutil.copy(f, username_loc)
|
||||
raw_path = Path(tmp_path) / "raw"
|
||||
shutil.copytree(src=example_src.absolute(), dst=raw_path.absolute())
|
||||
|
||||
bronze_path = Path(tmp_path) / "bronze"
|
||||
bronze_path.mkdir()
|
||||
@ -123,11 +108,16 @@ def dataplatform(
|
||||
def test_listing_content(dataplatform: DataPlateform):
|
||||
assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"]
|
||||
assert dataplatform.repository("test").schema("raw").tables == [
|
||||
"recovery",
|
||||
"username",
|
||||
"recovery",
|
||||
"salary",
|
||||
]
|
||||
assert dataplatform.repository("test").table("raw", "username").partitions == ["username.csv"]
|
||||
assert dataplatform.repository("test").table("raw", "recovery").partitions == [
|
||||
"2022.csv",
|
||||
"2023.csv",
|
||||
"2024.csv",
|
||||
]
|
||||
|
||||
|
||||
def test_execute_flux(dataplatform: DataPlateform):
|
||||
@ -137,6 +127,6 @@ def test_execute_flux(dataplatform: DataPlateform):
|
||||
assert dataplatform.repository("test").schema("bronze").tables == []
|
||||
|
||||
meta = dataplatform.execute_flux("raw_brz_copy_username")
|
||||
assert meta.data == {"src_size": 175, "tgt_size": 175}
|
||||
assert meta.data == {"src_size": 283, "tgt_size": 283}
|
||||
|
||||
assert dataplatform.repository("test").schema("bronze").tables == ["username"]
|
||||
|
@ -2,17 +2,13 @@ from plesna.graph.graph_set import EdgeOnSet, GraphSet, Node
|
||||
|
||||
|
||||
def test_init():
|
||||
graph_set = GraphSet()
|
||||
|
||||
nodeA = Node(name="A")
|
||||
nodeB = Node(name="B")
|
||||
nodeC = Node(name="C")
|
||||
edge1 = EdgeOnSet(arrow="arrow", sources=[nodeA, nodeB], targets=[nodeC])
|
||||
|
||||
def arrow(sources, targets):
|
||||
targets["C"].infos["res"] = sources["A"].name + sources["B"].name
|
||||
|
||||
edge1 = EdgeOnSet(
|
||||
arrow=arrow, sources={"A": nodeA, "B": nodeB}, targets={"C": nodeC}
|
||||
)
|
||||
graph_set = GraphSet()
|
||||
graph_set.append(edge1)
|
||||
|
||||
assert graph_set.node_sets == {frozenset([nodeA, nodeB]), frozenset([nodeC])}
|
||||
|
0
tests/libs/__init__.py
Normal file
0
tests/libs/__init__.py
Normal file
18
tests/libs/test_string_tools.py
Normal file
18
tests/libs/test_string_tools.py
Normal file
@ -0,0 +1,18 @@
|
||||
import pytest
|
||||
|
||||
from plesna.libs.string_tools import StringToolsError, extract_values_from_pattern
|
||||
|
||||
|
||||
def test_extract_values_from_pattern():
|
||||
source = "id:truc-bidule-machin"
|
||||
pattern = "id:{champ1}-{champ2}-machin"
|
||||
|
||||
assert extract_values_from_pattern(pattern, source) == {"champ1": "truc", "champ2": "bidule"}
|
||||
|
||||
|
||||
def test_extract_values_from_pattern_no_match():
|
||||
source = "id:truc-bidule"
|
||||
pattern = "id:{champ1}-{champ2}-machin"
|
||||
|
||||
with pytest.raises(StringToolsError):
|
||||
extract_values_from_pattern(pattern, source)
|
3
tests/raw_datas/recovery/2022.csv
Normal file
3
tests/raw_datas/recovery/2022.csv
Normal file
@ -0,0 +1,3 @@
|
||||
Identifier,One-time password
|
||||
9012,12se74
|
||||
2070,04ap67
|
|
4
tests/raw_datas/recovery/2023.csv
Normal file
4
tests/raw_datas/recovery/2023.csv
Normal file
@ -0,0 +1,4 @@
|
||||
Identifier,One-time password
|
||||
9012,32ui83
|
||||
9346,14ju73
|
||||
5079,09ja61
|
|
4
tests/raw_datas/recovery/2024.csv
Normal file
4
tests/raw_datas/recovery/2024.csv
Normal file
@ -0,0 +1,4 @@
|
||||
Identifier,One-time password
|
||||
9012,74iu23
|
||||
2070,12io89
|
||||
5079,85nc83
|
|
Binary file not shown.
Binary file not shown.
@ -1,7 +0,0 @@
|
||||
Username;Identifier;First name;Last name
|
||||
booker12;9012;Rachel;Booker
|
||||
grey07;2070;Laura;Grey
|
||||
johnson81;4081;Craig;Johnson
|
||||
jenkins46;9346;Mary;Jenkins
|
||||
smith79;5079;Jamie;Smith
|
||||
|
|
6
tests/raw_datas/username/username.csv
Normal file
6
tests/raw_datas/username/username.csv
Normal file
@ -0,0 +1,6 @@
|
||||
Username,Identifier,First name,Last name,Department,Location
|
||||
booker12,9012,Rachel,Booker,Sales,Manchester
|
||||
grey07,2070,Laura,Grey,Depot,London
|
||||
johnson81,4081,Craig,Johnson,Depot,London
|
||||
jenkins46,9346,Mary,Jenkins,Engineering,Manchester
|
||||
smith79,5079,Jamie,Smith,Engineering,Manchester
|
|
@ -3,7 +3,6 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from plesna.models.storage import Schema
|
||||
from plesna.storage.repository.fs_repository import FSRepository
|
||||
|
||||
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
|
||||
@ -11,37 +10,36 @@ FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
|
||||
|
||||
@pytest.fixture
|
||||
def location(tmp_path):
|
||||
loc = tmp_path
|
||||
username_loc = loc / "username"
|
||||
username_loc.mkdir()
|
||||
salary_loc = loc / "salary"
|
||||
salary_loc.mkdir()
|
||||
schema = tmp_path / "schema"
|
||||
example_src = FIXTURE_DIR
|
||||
assert example_src.exists()
|
||||
|
||||
for f in example_src.glob("*"):
|
||||
if "username" in str(f):
|
||||
shutil.copy(f, username_loc)
|
||||
else:
|
||||
shutil.copy(f, salary_loc)
|
||||
shutil.copytree(src=example_src.absolute(), dst=schema.absolute())
|
||||
|
||||
return loc
|
||||
return tmp_path
|
||||
|
||||
|
||||
def test_init(location):
|
||||
repo = FSRepository("example", location, "example")
|
||||
assert repo.ls() == [
|
||||
"schema",
|
||||
]
|
||||
assert repo.ls(dir="schema") == [
|
||||
"username",
|
||||
"recovery",
|
||||
"salary",
|
||||
]
|
||||
|
||||
assert repo.ls(recursive=True) == [
|
||||
"username",
|
||||
"salary",
|
||||
"username/username.csv",
|
||||
"username/username-password-recovery-code.xlsx",
|
||||
"username/username-password-recovery-code.xls",
|
||||
"salary/salary.pdf",
|
||||
"schema",
|
||||
"schema/username",
|
||||
"schema/recovery",
|
||||
"schema/salary",
|
||||
"schema/username/username.csv",
|
||||
"schema/recovery/2022.csv",
|
||||
"schema/recovery/2023.csv",
|
||||
"schema/recovery/2024.csv",
|
||||
"schema/salary/salary.pdf",
|
||||
]
|
||||
|
||||
|
||||
@ -51,27 +49,46 @@ def repository(location) -> FSRepository:
|
||||
|
||||
|
||||
def test_list_schema(location, repository):
|
||||
assert repository.schemas() == ["username", "salary"]
|
||||
assert repository.schema("username").name == "username"
|
||||
assert repository.schema("username").id == str(location / "username")
|
||||
assert repository.schema("username").repo_id == str(location)
|
||||
assert repository.schema("username").value == str(location / "username")
|
||||
assert repository.schemas() == ["schema"]
|
||||
assert repository.schema("schema").name == "schema"
|
||||
assert repository.schema("schema").id == str(location / "schema")
|
||||
assert repository.schema("schema").repo_id == str(location)
|
||||
assert repository.schema("schema").value == str(location / "schema")
|
||||
assert repository.schema("schema").tables == ["username", "recovery", "salary"]
|
||||
|
||||
|
||||
def test_list_tables_schema(repository):
|
||||
assert repository.schema("username").tables == [
|
||||
"username.csv",
|
||||
"username-password-recovery-code.xlsx",
|
||||
"username-password-recovery-code.xls",
|
||||
]
|
||||
assert repository.schema("salary").tables == ["salary.pdf"]
|
||||
assert repository.schema("schema").tables == ["username", "recovery", "salary"]
|
||||
assert repository.tables(schema="schema") == ["username", "recovery", "salary"]
|
||||
|
||||
|
||||
def test_describe_table(location, repository):
|
||||
table = repository.table("username", "username.csv")
|
||||
assert table.id == str(location / "username" / "username.csv")
|
||||
table = repository.table("schema", "username")
|
||||
|
||||
assert table.id == str(location / "schema" / "username")
|
||||
assert table.repo_id == str(location)
|
||||
assert table.schema_id == str(location / "username")
|
||||
assert table.name == "username.csv"
|
||||
assert table.value == str(location / "username" / "username.csv")
|
||||
assert table.partitions == []
|
||||
assert table.schema_id == str(location / "schema")
|
||||
assert table.name == "username"
|
||||
assert table.value == str(location / "schema" / "username")
|
||||
assert table.partitions == ["username.csv"]
|
||||
assert table.datas == [table.value + "/username.csv"]
|
||||
|
||||
|
||||
def test_describe_table_with_partitions(location, repository):
|
||||
table = repository.table("schema", "recovery")
|
||||
|
||||
assert table.id == str(location / "schema" / "recovery")
|
||||
assert table.repo_id == str(location)
|
||||
assert table.schema_id == str(location / "schema")
|
||||
assert table.name == "recovery"
|
||||
assert table.value == str(location / "schema" / "recovery")
|
||||
assert table.partitions == [
|
||||
"2022.csv",
|
||||
"2023.csv",
|
||||
"2024.csv",
|
||||
]
|
||||
assert table.datas == [
|
||||
table.value + "/2022.csv",
|
||||
table.value + "/2023.csv",
|
||||
table.value + "/2024.csv",
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user