Compare commits

..

No commits in common. "f2ed76c8aa108c70513948db8ce8305c6a81a5a7" and "b9dade270124d8a56b34064f85034ccb011de4e1" have entirely different histories.

14 changed files with 209 additions and 332 deletions

View File

@ -3,6 +3,6 @@ from plesna.models.flux import Flux, FluxMetaData
def consume_flux(flux: Flux) -> FluxMetaData: def consume_flux(flux: Flux) -> FluxMetaData:
metadata = flux.transformation.function( metadata = flux.transformation.function(
sources=flux.sources_dict, targets=flux.targets_dict, **flux.transformation.extra_kwrds sources=flux.sources, targets=flux.targets, **flux.transformation.extra_kwrds
) )
return FluxMetaData(data=metadata) return FluxMetaData(data=metadata)

View File

@ -1,8 +1,6 @@
from plesna.compute.consume_flux import consume_flux from plesna.compute.consume_flux import consume_flux
from plesna.graph.graph import Graph, Node from plesna.graph.graph_set import GraphSet
from plesna.graph.graph_set import EdgeOnSet, GraphSet
from plesna.models.flux import Flux, FluxMetaData from plesna.models.flux import Flux, FluxMetaData
from plesna.models.libs.flux_graph import flux_to_edgeonset
from plesna.storage.repository.repository import Repository from plesna.storage.repository.repository import Repository
@ -13,52 +11,31 @@ class DataPlateformError(Exception):
class DataPlateform: class DataPlateform:
def __init__(self): def __init__(self):
self._graphset = GraphSet() self._graphset = GraphSet()
self._graph = Graph()
self._metadata_engine = "" self._metadata_engine = ""
self._fluxes = {} self._fluxes = {}
self._repositories = {} self._repositories = {}
@property def add_repository(self, name: str, repository: Repository) -> str:
def graphset(self) -> GraphSet: if name in self._repositories:
return self._graphset raise DataPlateformError("The repository {name} already exists")
@property self._repositories[name] = repository
def graph(self) -> Graph: return name
return self._graph
def repository_graph_feed(self, repository_id: str):
for schema in self._repositories[repository_id].schemas():
for table in self._repositories[repository_id].tables(schema):
self._graph.add_node(Node(name=table))
def add_repository(self, repository: Repository) -> str:
if repository.id in self._repositories:
raise DataPlateformError("The repository {repository.id} already exists")
self._repositories[repository.id] = repository
self.repository_graph_feed(repository.id)
return repository.id
@property @property
def repositories(self) -> list[str]: def repositories(self) -> list[str]:
return list(self._repositories) return list(self._repositories)
def repository(self, id: str) -> Repository: def repository(self, name: str) -> Repository:
return self._repositories[id] return self._repositories[name]
def add_flux(self, name: str, flux: Flux) -> str: def add_flux(self, name: str, flux: Flux) -> str:
if name in self._fluxes: if name in self._fluxes:
raise DataPlateformError("The flux {name} already exists") raise DataPlateformError("The flux {name} already exists")
self._fluxes[name] = flux self._fluxes[name] = flux
self.flux_graphset_feed(name)
return name return name
def flux_graphset_feed(self, flux_name: str):
flux = self.flux(flux_name)
edge = flux_to_edgeonset(flux)
self._graphset.append(edge)
@property @property
def fluxes(self) -> list[str]: def fluxes(self) -> list[str]:
return list(self._fluxes) return list(self._fluxes)

View File

@ -1,6 +1,21 @@
from pydantic import BaseModel
from functools import reduce from functools import reduce
from plesna.models.graphs import Node, Edge
from pydantic import BaseModel
class Node(BaseModel):
name: str
infos: dict = {}
def __hash__(self):
return hash(self.name)
class Edge(BaseModel):
arrow_name: str
source: Node
target: Node
edge_kwrds: dict = {}
class Graph: class Graph:

View File

@ -1,4 +1,20 @@
from plesna.models.graphs import EdgeOnSet from typing import Callable
from pydantic import BaseModel
class Node(BaseModel):
name: str
infos: dict = {}
def __hash__(self):
return hash(self.name)
class EdgeOnSet(BaseModel):
arrow: str
sources: list[Node]
targets: list[Node]
class GraphSet: class GraphSet:

View File

@ -1,48 +1,14 @@
from collections.abc import Callable from pydantic import BaseModel
from pydantic import BaseModel, computed_field
from plesna.models.storage import Table from plesna.models.storage import Table
from plesna.models.transformation import Transformation
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
function: Callable
extra_kwrds: dict = {}
class Flux(BaseModel): class Flux(BaseModel):
id: str sources: dict[str, Table]
name: str targets: dict[str, Table]
sources: list[Table]
targets: list[Table]
transformation: Transformation transformation: Transformation
@computed_field
@property
def sources_dict(self) -> dict[str, Table]:
return {s.id: s for s in self.sources}
@computed_field
@property
def sources_id(self) -> dict[str, Table]:
return [s.id for s in self.sources]
@computed_field
@property
def targets_id(self) -> dict[str, Table]:
return [s.id for s in self.targets]
@computed_field
@property
def targets_dict(self) -> dict[str, Table]:
return {s.id: s for s in self.targets}
class FluxMetaData(BaseModel): class FluxMetaData(BaseModel):
data: dict data: dict

View File

@ -1,21 +0,0 @@
from pydantic import BaseModel
class Node(BaseModel):
name: str
def __hash__(self):
return hash(self.name)
class Edge(BaseModel):
arrow_name: str
source: Node
target: Node
edge_kwrds: dict = {}
class EdgeOnSet(BaseModel):
arrow: str
sources: list[Node]
targets: list[Node]

View File

@ -0,0 +1,15 @@
from collections.abc import Callable
from pydantic import BaseModel
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
function: Callable
extra_kwrds: dict = {}

View File

@ -2,16 +2,29 @@ from pathlib import Path
from pydantic import BaseModel, computed_field from pydantic import BaseModel, computed_field
from plesna.libs.string_tools import extract_values_from_pattern
from plesna.models.storage import Partition, Schema, Table from plesna.models.storage import Partition, Schema, Table
from plesna.storage.repository.repository import Repository from plesna.storage.repository.repository import Repository
class FSPartition(BaseModel):
name: str
path: Path
@computed_field
@property
def ref(self) -> Partition:
return Partition(
id=str(self.path),
repo_id=str(self.path.parent.parent.parent),
schema_id=str(self.path.parent.parent),
table_id=str(self.path.parent),
name=self.name,
value=str(self.path.absolute()),
)
class FSTable(BaseModel): class FSTable(BaseModel):
name: str name: str
repo_id: str
schema_id: str
id: str
path: Path path: Path
is_partitionned: bool is_partitionned: bool
partitions: list[str] = [] partitions: list[str] = []
@ -25,9 +38,9 @@ class FSTable(BaseModel):
datas = [str(self.path.absolute())] datas = [str(self.path.absolute())]
return Table( return Table(
id=self.id, id=str(self.path),
repo_id=self.repo_id, repo_id=str(self.path.parent.parent),
schema_id=self.schema_id, schema_id=str(self.path.parent),
name=self.name, name=self.name,
value=str(self.path.absolute()), value=str(self.path.absolute()),
partitions=self.partitions, partitions=self.partitions,
@ -37,8 +50,6 @@ class FSTable(BaseModel):
class FSSchema(BaseModel): class FSSchema(BaseModel):
name: str name: str
repo_id: str
id: str
path: Path path: Path
tables: list[str] tables: list[str]
@ -46,18 +57,14 @@ class FSSchema(BaseModel):
@property @property
def ref(self) -> Schema: def ref(self) -> Schema:
return Schema( return Schema(
id=self.id, id=str(self.path),
repo_id=self.repo_id, repo_id=str(self.path.parent),
name=self.name, name=self.name,
value=str(self.path.absolute()), value=str(self.path.absolute()),
tables=self.tables, tables=self.tables,
) )
class FSRepositoryError(ValueError):
pass
class FSRepository(Repository): class FSRepository(Repository):
"""Repository based on files tree structure """Repository based on files tree structure
@ -67,15 +74,11 @@ class FSRepository(Repository):
""" """
ID_FMT = { def __init__(self, name: str, basepath: str, id: str):
"schema": "{repo_id}-{schema_name}",
"table": "{schema_id}-{table_name}",
}
def __init__(self, id: str, name: str, basepath: str):
super().__init__(id, name)
self._basepath = Path(basepath) self._basepath = Path(basepath)
self.name = name
self.id = id
assert self._basepath.exists() assert self._basepath.exists()
def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]: def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]:
@ -109,89 +112,47 @@ class FSRepository(Repository):
return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")] return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")]
def parse_id(self, string: str, id_type: str) -> dict:
if id_type not in self.ID_FMT:
raise FSRepositoryError(
"Wrong id_type. Gots {id_type} needs to be one of {self.ID_FMT.values}"
)
parsed = extract_values_from_pattern(self.ID_FMT[id_type], string)
if not parsed:
raise FSRepositoryError(
f"Wrong format for {id_type}. Got {string} need {self.ID_FMT['id_type']}"
)
return parsed
def schemas(self) -> list[str]: def schemas(self) -> list[str]:
"""List schemas (sub directories within basepath)""" """List schemas (sub directories within basepath)"""
subdirectories = self.ls("", only_directories=True) subdirectories = self.ls("", only_directories=True)
return [ return [str(d) for d in subdirectories]
self.ID_FMT["schema"].format(repo_id=self.id, schema_name=d) for d in subdirectories
]
def _schema(self, schema_id: str) -> FSSchema: def _schema(self, name: str) -> FSSchema:
"""List schemas (sub directories within basepath)""" """List schemas (sub directories within basepath)"""
parsed = self.parse_id(schema_id, "schema") schema_path = self._basepath / name
tables = self.tables(schema=name)
return FSSchema(name=name, path=schema_path, tables=tables)
repo_id = parsed["repo_id"] def schema(self, name: str) -> Schema:
schema_name = parsed["schema_name"] return self._schema(name).ref
schema_path = self._basepath / schema_name
if repo_id != self.id: def tables(self, schema: str) -> list[str]:
raise FSRepositoryError("Trying to get schema that don't belong in this repository") tables = self.ls(schema)
tables = self.tables(schema_id)
return FSSchema(
name=schema_name,
id=schema_id,
repo_id=self.id,
schema_id=schema_id,
path=schema_path,
tables=tables,
)
def schema(self, schema_id: str) -> Schema:
return self._schema(schema_id).ref
def _tables(self, schema_id: str) -> list[str]:
parsed = self.parse_id(schema_id, "schema")
tables = self.ls(parsed["schema_name"])
return [self.ID_FMT["table"].format(table_name=t, schema_id=schema_id) for t in tables]
def tables(self, schema_id: str = "") -> list[str]:
if schema_id:
return self._tables(schema_id)
tables = []
for schema in self.schemas():
tables += self._tables(schema)
return tables return tables
def _table(self, table_id: str) -> FSTable: def _table(self, schema: str, name: str) -> FSTable:
"""Get infos on the table""" """Get infos on the table"""
parsed = self.parse_id(table_id, "table") table_path = self._basepath / schema / name
schema = self._schema(parsed["schema_id"])
if not schema.path.exists():
raise FSRepositoryError(f"The schema {schema.id} does not exists.")
table_subpath = f"{schema.name}/{parsed['table_name']}"
table_path = self._basepath / table_subpath
is_partitionned = table_path.is_dir() is_partitionned = table_path.is_dir()
if is_partitionned: if is_partitionned:
partitions = self.ls(table_subpath, only_files=True) partitions = self.ls(f"{schema}/{name}", only_files=True)
else: else:
partitions = [] partitions = []
return FSTable( return FSTable(
name=parsed["table_name"], name=name,
id=table_id,
repo_id=self.id,
schema_id=schema.id,
path=table_path, path=table_path,
is_partitionned=is_partitionned, is_partitionned=is_partitionned,
partitions=partitions, partitions=partitions,
) )
def table(self, table_id: str) -> Table: def table(self, schema: str, name: str) -> Table:
return self._table(table_id).ref return self._table(schema, name).ref
def _partition(self, schema: str, table: str, partition: str) -> FSPartition:
"""Get infos on the partition"""
table_path = self._basepath / schema / table
return FSPartition(name=partition, table_path=table_path)
def partition(self, schema: str, name: str) -> Partition:
return self._partition(schema, name).ref

View File

@ -4,34 +4,35 @@ from plesna.models.storage import Partition, Schema, Table
class Repository: class Repository:
def __init__(self, id: str, name: str): def __init__(self):
self._id = id pass
self._name = name
@property
def id(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@abc.abstractmethod @abc.abstractmethod
def schemas(self) -> list[str]: def schemas(self) -> list[str]:
"""List schema's ids""" """List schema's names"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def schema(self, schema_id: str) -> Schema: def schema(self, name: str) -> Schema:
"""Get the schema properties""" """Get the schema properties"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def tables(self, schema_id: str) -> list[str]: def tables(self, schema: str) -> list[str]:
"""List table's name in schema (the id)""" """List table's name in schema"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def table(self, table_id: str) -> Table: def table(self, schema: str, name: str) -> Table:
"""Get the table properties (the id)""" """Get the table properties"""
raise NotImplementedError
@abc.abstractmethod
def partitions(self, schema: str, table: str) -> list[str]:
"""List partition's name in table"""
raise NotImplementedError
@abc.abstractmethod
def partition(self, schema: str, name: str, partition: str) -> Partition:
"""Get the partition properties"""
raise NotImplementedError raise NotImplementedError

View File

@ -1,17 +1,26 @@
from plesna.compute.consume_flux import consume_flux from plesna.compute.consume_flux import consume_flux
from plesna.models.flux import Flux, Transformation from plesna.models.flux import Flux
from plesna.models.storage import Table from plesna.models.storage import Table
from plesna.models.transformation import Transformation
def test_consume_flux(): def test_consume_flux():
sources = [ sources = {
Table(id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]), "src1": Table(
Table(id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]), id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]
] ),
targets = [ "src2": Table(
Table(id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]), id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]
Table(id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]), ),
] }
targets = {
"tgt1": Table(
id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]
),
"tgt2": Table(
id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]
),
}
def func(sources, targets, **kwrds): def func(sources, targets, **kwrds):
return { return {
@ -21,8 +30,6 @@ def test_consume_flux():
} }
flux = Flux( flux = Flux(
id="flux",
name="flux",
sources=sources, sources=sources,
targets=targets, targets=targets,
transformation=Transformation(function=func, extra_kwrds={"extra": "super"}), transformation=Transformation(function=func, extra_kwrds={"extra": "super"}),

View File

@ -4,8 +4,8 @@ from pathlib import Path
import pytest import pytest
from plesna.dataplatform import DataPlateform from plesna.dataplatform import DataPlateform
from plesna.models.graphs import Node from plesna.models.flux import Flux
from plesna.models.flux import Flux, Transformation from plesna.models.transformation import Transformation
from plesna.storage.repository.fs_repository import FSRepository from plesna.storage.repository.fs_repository import FSRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@ -24,50 +24,24 @@ def repository(tmp_path) -> FSRepository:
silver_path = Path(tmp_path) / "silver" silver_path = Path(tmp_path) / "silver"
silver_path.mkdir() silver_path.mkdir()
return FSRepository("test", "test", tmp_path) return FSRepository("test", tmp_path, "test")
def test_add_repository( def test_add_repository(
repository: FSRepository, repository: FSRepository,
): ):
dp = DataPlateform() dp = DataPlateform()
dp.add_repository(repository) dp.add_repository("test", repository)
assert dp.repositories == ["test"] assert dp.repositories == ["test"]
assert dp.repository("test") == repository assert dp.repository("test") == repository
@pytest.fixture
def copy_flux(repository: FSRepository) -> Flux:
raw_username = [repository.table("test-raw-username")]
bronze_username = [repository.table("test-bronze-username")]
def copy(sources, targets):
src_path = Path(sources["test-raw-username"].datas[0])
tgt_path = Path(targets["test-bronze-username"].datas[0])
shutil.copy(src_path, tgt_path)
return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size}
extra_kwrds = {}
raw_brz_copy_username = Flux(
id="copy_flux",
name="copy",
sources=raw_username,
targets=bronze_username,
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
)
return raw_brz_copy_username
@pytest.fixture @pytest.fixture
def foo_flux(repository: FSRepository) -> Flux: def foo_flux(repository: FSRepository) -> Flux:
src = [ src = {"username": repository.table("raw", "username")}
repository.table("test-raw-username"), targets = {"username": repository.table("bronze", "username")}
repository.table("test-raw-recovery"),
]
targets = [repository.table("test-bronze-foo")]
def foo(sources, targets): def foo(sources, targets):
return {"who": "foo"} return {"who": "foo"}
@ -75,8 +49,6 @@ def foo_flux(repository: FSRepository) -> Flux:
extra_kwrds = {} extra_kwrds = {}
flux = Flux( flux = Flux(
id="foo_flux",
name="foo",
sources=src, sources=src,
targets=targets, targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds), transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
@ -84,9 +56,30 @@ def foo_flux(repository: FSRepository) -> Flux:
return flux return flux
@pytest.fixture
def copy_flux(repository: FSRepository) -> Flux:
raw_username = {"username": repository.table("raw", "username")}
bronze_username = {"username": repository.table("bronze", "username")}
def copy(sources, targets):
src_path = Path(sources["username"].datas[0])
tgt_path = Path(targets["username"].datas[0])
shutil.copy(src_path, tgt_path)
return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size}
extra_kwrds = {}
raw_brz_copy_username = Flux(
sources=raw_username,
targets=bronze_username,
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
)
return raw_brz_copy_username
def test_add_flux(repository: FSRepository, copy_flux: Flux): def test_add_flux(repository: FSRepository, copy_flux: Flux):
dataplatform = DataPlateform() dataplatform = DataPlateform()
dataplatform.add_repository(repository) dataplatform.add_repository("test", repository)
dataplatform.add_flux(name="copy_flux", flux=copy_flux) dataplatform.add_flux(name="copy_flux", flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"] assert dataplatform.fluxes == ["copy_flux"]
@ -105,7 +98,7 @@ def dataplatform(
) -> DataPlateform: ) -> DataPlateform:
dp = DataPlateform() dp = DataPlateform()
dp.add_repository(repository) dp.add_repository("test", repository)
dp.add_flux("foo", foo_flux) dp.add_flux("foo", foo_flux)
dp.add_flux("raw_brz_copy_username", copy_flux) dp.add_flux("raw_brz_copy_username", copy_flux)
@ -113,59 +106,27 @@ def dataplatform(
def test_listing_content(dataplatform: DataPlateform): def test_listing_content(dataplatform: DataPlateform):
assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"] assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"]
assert dataplatform.repository("test").schema("test-raw").tables == [ assert dataplatform.repository("test").schema("raw").tables == [
"test-raw-username", "username",
"test-raw-recovery", "recovery",
"test-raw-salary", "salary",
] ]
assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"] assert dataplatform.repository("test").table("raw", "username").partitions == ["username.csv"]
assert dataplatform.repository("test").table("test-raw-recovery").partitions == [ assert dataplatform.repository("test").table("raw", "recovery").partitions == [
"2022.csv", "2022.csv",
"2023.csv", "2023.csv",
"2024.csv", "2024.csv",
] ]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph.nodes == {
Node(name="test-raw-recovery", infos={}),
Node(name="test-raw-salary", infos={}),
Node(name="test-raw-username", infos={}),
}
assert dataplatform.graphset.node_sets == {
frozenset(
{
Node(name="test-bronze-username"),
}
),
frozenset(
{
Node(name="test-bronze-foo"),
}
),
frozenset(
{
Node(name="test-raw-username"),
}
),
frozenset(
{
Node(name="test-raw-username"),
Node(name="test-raw-recovery"),
}
),
}
def test_execute_flux(dataplatform: DataPlateform): def test_execute_flux(dataplatform: DataPlateform):
meta = dataplatform.execute_flux("foo") meta = dataplatform.execute_flux("foo")
assert meta.data == {"who": "foo"} assert meta.data == {"who": "foo"}
assert dataplatform.repository("test").schema("test-bronze").tables == [] assert dataplatform.repository("test").schema("bronze").tables == []
meta = dataplatform.execute_flux("raw_brz_copy_username") meta = dataplatform.execute_flux("raw_brz_copy_username")
assert meta.data == {"src_size": 283, "tgt_size": 283} assert meta.data == {"src_size": 283, "tgt_size": 283}
assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"] assert dataplatform.repository("test").schema("bronze").tables == ["username"]

View File

@ -1,7 +1,6 @@
import pytest import pytest
from plesna.graph.graph import Graph from plesna.graph.graph import Edge, Graph, Node
from plesna.models.graphs import Edge, Node
def test_append_nodess(): def test_append_nodess():
@ -95,7 +94,9 @@ def test_get_sources_from(nodes, dag_edges):
assert graph.get_direct_sources_from(nodes["C"]) == set([nodes["A"], nodes["B"]]) assert graph.get_direct_sources_from(nodes["C"]) == set([nodes["A"], nodes["B"]])
assert graph.get_direct_sources_from(nodes["D"]) == set([nodes["C"]]) assert graph.get_direct_sources_from(nodes["D"]) == set([nodes["C"]])
assert graph.get_sources_from(nodes["D"]) == set([nodes["A"], nodes["B"], nodes["C"]]) assert graph.get_sources_from(nodes["D"]) == set(
[nodes["A"], nodes["B"], nodes["C"]]
)
def test_valid_dage(dag_edges, notdag_edges): def test_valid_dage(dag_edges, notdag_edges):

View File

@ -1,5 +1,4 @@
from plesna.graph.graph_set import GraphSet from plesna.graph.graph_set import EdgeOnSet, GraphSet, Node
from plesna.models.graphs import EdgeOnSet, Node
def test_init(): def test_init():

View File

@ -20,7 +20,7 @@ def location(tmp_path):
def test_init(location): def test_init(location):
repo = FSRepository("example", "example", location) repo = FSRepository("example", location, "example")
assert repo.ls() == [ assert repo.ls() == [
"schema", "schema",
] ]
@ -45,50 +45,29 @@ def test_init(location):
@pytest.fixture @pytest.fixture
def repository(location) -> FSRepository: def repository(location) -> FSRepository:
return FSRepository("repo_id", "example", location) return FSRepository("example", location, "example")
def test_list_schemas(repository): def test_list_schema(location, repository):
assert repository.schemas() == ["repo_id-schema"] assert repository.schemas() == ["schema"]
assert repository.schema("schema").name == "schema"
assert repository.schema("schema").id == str(location / "schema")
def test_describe_schema(location, repository): assert repository.schema("schema").repo_id == str(location)
schema = repository.schema("repo_id-schema") assert repository.schema("schema").value == str(location / "schema")
assert schema.name == "schema" assert repository.schema("schema").tables == ["username", "recovery", "salary"]
assert schema.id == "repo_id-schema"
assert schema.repo_id == "repo_id"
assert schema.value == str(location / "schema")
assert schema.tables == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
def test_list_tables_schema(repository): def test_list_tables_schema(repository):
assert repository.schema("repo_id-schema").tables == [ assert repository.schema("schema").tables == ["username", "recovery", "salary"]
"repo_id-schema-username", assert repository.tables(schema="schema") == ["username", "recovery", "salary"]
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
assert repository.tables("repo_id-schema") == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
assert repository.tables() == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
def test_describe_table(location, repository): def test_describe_table(location, repository):
table = repository.table("repo_id-schema-username") table = repository.table("schema", "username")
assert table.id == "repo_id-schema-username" assert table.id == str(location / "schema" / "username")
assert table.repo_id == "repo_id" assert table.repo_id == str(location)
assert table.schema_id == "repo_id-schema" assert table.schema_id == str(location / "schema")
assert table.name == "username" assert table.name == "username"
assert table.value == str(location / "schema" / "username") assert table.value == str(location / "schema" / "username")
assert table.partitions == ["username.csv"] assert table.partitions == ["username.csv"]
@ -96,11 +75,11 @@ def test_describe_table(location, repository):
def test_describe_table_with_partitions(location, repository): def test_describe_table_with_partitions(location, repository):
table = repository.table("repo_id-schema-recovery") table = repository.table("schema", "recovery")
assert table.id == "repo_id-schema-recovery" assert table.id == str(location / "schema" / "recovery")
assert table.repo_id == "repo_id" assert table.repo_id == str(location)
assert table.schema_id == "repo_id-schema" assert table.schema_id == str(location / "schema")
assert table.name == "recovery" assert table.name == "recovery"
assert table.value == str(location / "schema" / "recovery") assert table.value == str(location / "schema" / "recovery")
assert table.partitions == [ assert table.partitions == [