refact: reorganize raw_datas and adapt tests

This commit is contained in:
Bertrand Benjamin 2025-01-05 06:42:51 +01:00
parent 86f0dcc49e
commit f0315d09b9
10 changed files with 77 additions and 61 deletions

View File

@ -13,26 +13,11 @@ FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@pytest.fixture
def repository(tmp_path) -> FSRepository:
raw_path = Path(tmp_path) / "raw"
raw_path.mkdir()
example_src = FIXTURE_DIR
assert example_src.exists()
recovery_loc = raw_path / "recovery"
recovery_loc.mkdir()
username_loc = raw_path / "username"
username_loc.mkdir()
salary_loc = raw_path / "salary"
salary_loc.mkdir()
for f in example_src.glob("*"):
if "recovery" in str(f):
shutil.copy(f, recovery_loc)
elif "salary" in str(f):
shutil.copy(f, salary_loc)
else:
shutil.copy(f, username_loc)
raw_path = Path(tmp_path) / "raw"
shutil.copytree(src=example_src.absolute(), dst=raw_path.absolute())
bronze_path = Path(tmp_path) / "bronze"
bronze_path.mkdir()
@ -123,11 +108,16 @@ def dataplatform(
def test_listing_content(dataplatform: DataPlateform):
assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"]
assert dataplatform.repository("test").schema("raw").tables == [
"recovery",
"username",
"recovery",
"salary",
]
assert dataplatform.repository("test").table("raw", "username").partitions == ["username.csv"]
assert dataplatform.repository("test").table("raw", "recovery").partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
def test_execute_flux(dataplatform: DataPlateform):
@ -137,6 +127,6 @@ def test_execute_flux(dataplatform: DataPlateform):
assert dataplatform.repository("test").schema("bronze").tables == []
meta = dataplatform.execute_flux("raw_brz_copy_username")
assert meta.data == {"src_size": 175, "tgt_size": 175}
assert meta.data == {"src_size": 283, "tgt_size": 283}
assert dataplatform.repository("test").schema("bronze").tables == ["username"]

View File

@ -0,0 +1,3 @@
Identifier,One-time password
9012,12se74
2070,04ap67
1 Identifier One-time password
2 9012 12se74
3 2070 04ap67

View File

@ -0,0 +1,4 @@
Identifier,One-time password
9012,32ui83
9346,14ju73
5079,09ja61
1 Identifier One-time password
2 9012 32ui83
3 9346 14ju73
4 5079 09ja61

View File

@ -0,0 +1,4 @@
Identifier,One-time password
9012,74iu23
2070,12io89
5079,85nc83
1 Identifier One-time password
2 9012 74iu23
3 2070 12io89
4 5079 85nc83

View File

@ -1,7 +0,0 @@
Username;Identifier;First name;Last name
booker12;9012;Rachel;Booker
grey07;2070;Laura;Grey
johnson81;4081;Craig;Johnson
jenkins46;9346;Mary;Jenkins
smith79;5079;Jamie;Smith
1 Username Identifier First name Last name
2 booker12 9012 Rachel Booker
3 grey07 2070 Laura Grey
4 johnson81 4081 Craig Johnson
5 jenkins46 9346 Mary Jenkins
6 smith79 5079 Jamie Smith

View File

@ -0,0 +1,6 @@
Username,Identifier,First name,Last name,Department,Location
booker12,9012,Rachel,Booker,Sales,Manchester
grey07,2070,Laura,Grey,Depot,London
johnson81,4081,Craig,Johnson,Depot,London
jenkins46,9346,Mary,Jenkins,Engineering,Manchester
smith79,5079,Jamie,Smith,Engineering,Manchester
1 Username Identifier First name Last name Department Location
2 booker12 9012 Rachel Booker Sales Manchester
3 grey07 2070 Laura Grey Depot London
4 johnson81 4081 Craig Johnson Depot London
5 jenkins46 9346 Mary Jenkins Engineering Manchester
6 smith79 5079 Jamie Smith Engineering Manchester

View File

@ -3,7 +3,6 @@ from pathlib import Path
import pytest
from plesna.models.storage import Schema
from plesna.storage.repository.fs_repository import FSRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@ -11,37 +10,36 @@ FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@pytest.fixture
def location(tmp_path):
loc = tmp_path
username_loc = loc / "username"
username_loc.mkdir()
salary_loc = loc / "salary"
salary_loc.mkdir()
schema = tmp_path / "schema"
example_src = FIXTURE_DIR
assert example_src.exists()
for f in example_src.glob("*"):
if "username" in str(f):
shutil.copy(f, username_loc)
else:
shutil.copy(f, salary_loc)
shutil.copytree(src=example_src.absolute(), dst=schema.absolute())
return loc
return tmp_path
def test_init(location):
repo = FSRepository("example", location, "example")
assert repo.ls() == [
"schema",
]
assert repo.ls(dir="schema") == [
"username",
"recovery",
"salary",
]
assert repo.ls(recursive=True) == [
"username",
"salary",
"username/username.csv",
"username/username-password-recovery-code.xlsx",
"username/username-password-recovery-code.xls",
"salary/salary.pdf",
"schema",
"schema/username",
"schema/recovery",
"schema/salary",
"schema/username/username.csv",
"schema/recovery/2022.csv",
"schema/recovery/2023.csv",
"schema/recovery/2024.csv",
"schema/salary/salary.pdf",
]
@ -51,27 +49,45 @@ def repository(location) -> FSRepository:
def test_list_schema(location, repository):
assert repository.schemas() == ["username", "salary"]
assert repository.schema("username").name == "username"
assert repository.schema("username").id == str(location / "username")
assert repository.schema("username").repo_id == str(location)
assert repository.schema("username").value == str(location / "username")
assert repository.schemas() == ["schema"]
assert repository.schema("schema").name == "schema"
assert repository.schema("schema").id == str(location / "schema")
assert repository.schema("schema").repo_id == str(location)
assert repository.schema("schema").value == str(location / "schema")
assert repository.schema("schema").tables == ["username", "recovery", "salary"]
def test_list_tables_schema(repository):
assert repository.schema("username").tables == [
"username.csv",
"username-password-recovery-code.xlsx",
"username-password-recovery-code.xls",
]
assert repository.schema("salary").tables == ["salary.pdf"]
assert repository.schema("schema").tables == ["username", "recovery", "salary"]
def test_describe_table(location, repository):
table = repository.table("username", "username.csv")
assert table.id == str(location / "username" / "username.csv")
table = repository.table("schema", "username")
assert table.id == str(location / "schema" / "username")
assert table.repo_id == str(location)
assert table.schema_id == str(location / "username")
assert table.name == "username.csv"
assert table.value == str(location / "username" / "username.csv")
assert table.partitions == []
assert table.schema_id == str(location / "schema")
assert table.name == "username"
assert table.value == str(location / "schema" / "username")
assert table.partitions == ["username.csv"]
assert table.datas == [table.value + "/username.csv"]
def test_describe_table_with_partitions(location, repository):
table = repository.table("schema", "recovery")
assert table.id == str(location / "schema" / "recovery")
assert table.repo_id == str(location)
assert table.schema_id == str(location / "schema")
assert table.name == "recovery"
assert table.value == str(location / "schema" / "recovery")
assert table.partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
assert table.datas == [
table.value + "/2022.csv",
table.value + "/2023.csv",
table.value + "/2024.csv",
]