plesna/tests/storage/test_fs_repository.py

95 lines
2.8 KiB
Python

import shutil
from pathlib import Path
import pytest
from plesna.storage.repository.fs_repository import FSRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("./raw_datas/")
@pytest.fixture
def location(tmp_path):
schema = tmp_path / "schema"
example_src = FIXTURE_DIR
assert example_src.exists()
shutil.copytree(src=example_src.absolute(), dst=schema.absolute())
return tmp_path
def test_init(location):
repo = FSRepository("example", location, "example")
assert repo.ls() == [
"schema",
]
assert repo.ls(dir="schema") == [
"username",
"recovery",
"salary",
]
assert repo.ls(recursive=True) == [
"schema",
"schema/username",
"schema/recovery",
"schema/salary",
"schema/username/username.csv",
"schema/recovery/2022.csv",
"schema/recovery/2023.csv",
"schema/recovery/2024.csv",
"schema/salary/salary.pdf",
]
@pytest.fixture
def repository(location) -> FSRepository:
return FSRepository("example", location, "example")
def test_list_schema(location, repository):
assert repository.schemas() == ["schema"]
assert repository.schema("schema").name == "schema"
assert repository.schema("schema").id == str(location / "schema")
assert repository.schema("schema").repo_id == str(location)
assert repository.schema("schema").value == str(location / "schema")
assert repository.schema("schema").tables == ["username", "recovery", "salary"]
def test_list_tables_schema(repository):
assert repository.schema("schema").tables == ["username", "recovery", "salary"]
assert repository.tables(schema="schema") == ["username", "recovery", "salary"]
def test_describe_table(location, repository):
table = repository.table("schema", "username")
assert table.id == str(location / "schema" / "username")
assert table.repo_id == str(location)
assert table.schema_id == str(location / "schema")
assert table.name == "username"
assert table.value == str(location / "schema" / "username")
assert table.partitions == ["username.csv"]
assert table.datas == [table.value + "/username.csv"]
def test_describe_table_with_partitions(location, repository):
table = repository.table("schema", "recovery")
assert table.id == str(location / "schema" / "recovery")
assert table.repo_id == str(location)
assert table.schema_id == str(location / "schema")
assert table.name == "recovery"
assert table.value == str(location / "schema" / "recovery")
assert table.partitions == [
"2022.csv",
"2023.csv",
"2024.csv",
]
assert table.datas == [
table.value + "/2022.csv",
table.value + "/2023.csv",
table.value + "/2024.csv",
]