Feat: create datacatalogue with fs_datacatalogue
This commit is contained in:
0
tests/datastore/__init__.py
Normal file
0
tests/datastore/__init__.py
Normal file
BIN
tests/datastore/fs_files/salary.pdf
Normal file
BIN
tests/datastore/fs_files/salary.pdf
Normal file
Binary file not shown.
BIN
tests/datastore/fs_files/username-password-recovery-code.xls
Normal file
BIN
tests/datastore/fs_files/username-password-recovery-code.xls
Normal file
Binary file not shown.
BIN
tests/datastore/fs_files/username-password-recovery-code.xlsx
Normal file
BIN
tests/datastore/fs_files/username-password-recovery-code.xlsx
Normal file
Binary file not shown.
7
tests/datastore/fs_files/username.csv
Normal file
7
tests/datastore/fs_files/username.csv
Normal file
@@ -0,0 +1,7 @@
|
||||
Username;Identifier;First name;Last name
|
||||
booker12;9012;Rachel;Booker
|
||||
grey07;2070;Laura;Grey
|
||||
johnson81;4081;Craig;Johnson
|
||||
jenkins46;9346;Mary;Jenkins
|
||||
smith79;5079;Jamie;Smith
|
||||
|
||||
|
72
tests/datastore/test_fs_datacatalogue.py
Normal file
72
tests/datastore/test_fs_datacatalogue.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from plesna.datastore.fs_datacatalogue import FSDataCatalogue
|
||||
|
||||
FIXTURE_DIR = Path(__file__).parent / Path("./fs_files/")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def location(tmp_path):
|
||||
loc = tmp_path
|
||||
username_loc = loc / "username"
|
||||
username_loc.mkdir()
|
||||
salary_loc = loc / "salary"
|
||||
salary_loc.mkdir()
|
||||
example_src = FIXTURE_DIR
|
||||
assert example_src.exists()
|
||||
|
||||
for f in example_src.glob("*"):
|
||||
if "username" in str(f):
|
||||
shutil.copy(f, username_loc)
|
||||
else:
|
||||
shutil.copy(f, salary_loc)
|
||||
|
||||
return loc
|
||||
|
||||
|
||||
def test_init(location):
|
||||
repo = FSDataCatalogue("example", location)
|
||||
assert repo.ls() == [
|
||||
"username",
|
||||
"salary",
|
||||
]
|
||||
|
||||
assert repo.ls(recursive=True) == [
|
||||
"username",
|
||||
"salary",
|
||||
]
|
||||
|
||||
|
||||
def test_list_schema(location):
|
||||
repo = FSDataCatalogue("example", location)
|
||||
assert {id: s.model_dump()["id"] for id, s in repo.schemas().items()} == {
|
||||
".": ".",
|
||||
"username": "username",
|
||||
"salary": "salary",
|
||||
}
|
||||
assert {id: s.model_dump()["value"] for id, s in repo.schemas().items()} == {
|
||||
".": ".",
|
||||
"username": "username",
|
||||
"salary": "salary",
|
||||
}
|
||||
assert {id: s.model_dump()["path"] for id, s in repo.schemas().items()} == {
|
||||
".": Path("."),
|
||||
"username": Path("username"),
|
||||
"salary": Path("salary"),
|
||||
}
|
||||
|
||||
|
||||
def test_list_tables(location):
|
||||
repo = FSDataCatalogue("example", location)
|
||||
assert repo.tables() == {}
|
||||
assert {id: t.model_dump()["value"] for id,t in repo.tables("username").items()} == {
|
||||
"username.csv": "username.csv",
|
||||
"username-password-recovery-code.xlsx": "username-password-recovery-code.xlsx",
|
||||
"username-password-recovery-code.xls": "username-password-recovery-code.xls",
|
||||
}
|
||||
assert {id: t.model_dump()["value"] for id,t in repo.tables("salary").items()} == {
|
||||
"salary.pdf": "salary.pdf",
|
||||
}
|
||||
Reference in New Issue
Block a user