Compare commits

...

9 Commits

14 changed files with 331 additions and 208 deletions

View File

@ -3,6 +3,6 @@ from plesna.models.flux import Flux, FluxMetaData
def consume_flux(flux: Flux) -> FluxMetaData: def consume_flux(flux: Flux) -> FluxMetaData:
metadata = flux.transformation.function( metadata = flux.transformation.function(
sources=flux.sources, targets=flux.targets, **flux.transformation.extra_kwrds sources=flux.sources_dict, targets=flux.targets_dict, **flux.transformation.extra_kwrds
) )
return FluxMetaData(data=metadata) return FluxMetaData(data=metadata)

View File

@ -1,6 +1,8 @@
from plesna.compute.consume_flux import consume_flux from plesna.compute.consume_flux import consume_flux
from plesna.graph.graph_set import GraphSet from plesna.graph.graph import Graph, Node
from plesna.graph.graph_set import EdgeOnSet, GraphSet
from plesna.models.flux import Flux, FluxMetaData from plesna.models.flux import Flux, FluxMetaData
from plesna.models.libs.flux_graph import flux_to_edgeonset
from plesna.storage.repository.repository import Repository from plesna.storage.repository.repository import Repository
@ -11,31 +13,52 @@ class DataPlateformError(Exception):
class DataPlateform: class DataPlateform:
def __init__(self): def __init__(self):
self._graphset = GraphSet() self._graphset = GraphSet()
self._graph = Graph()
self._metadata_engine = "" self._metadata_engine = ""
self._fluxes = {} self._fluxes = {}
self._repositories = {} self._repositories = {}
def add_repository(self, name: str, repository: Repository) -> str: @property
if name in self._repositories: def graphset(self) -> GraphSet:
raise DataPlateformError("The repository {name} already exists") return self._graphset
self._repositories[name] = repository @property
return name def graph(self) -> Graph:
return self._graph
def repository_graph_feed(self, repository_id: str):
for schema in self._repositories[repository_id].schemas():
for table in self._repositories[repository_id].tables(schema):
self._graph.add_node(Node(name=table))
def add_repository(self, repository: Repository) -> str:
if repository.id in self._repositories:
raise DataPlateformError("The repository {repository.id} already exists")
self._repositories[repository.id] = repository
self.repository_graph_feed(repository.id)
return repository.id
@property @property
def repositories(self) -> list[str]: def repositories(self) -> list[str]:
return list(self._repositories) return list(self._repositories)
def repository(self, name: str) -> Repository: def repository(self, id: str) -> Repository:
return self._repositories[name] return self._repositories[id]
def add_flux(self, name: str, flux: Flux) -> str: def add_flux(self, name: str, flux: Flux) -> str:
if name in self._fluxes: if name in self._fluxes:
raise DataPlateformError("The flux {name} already exists") raise DataPlateformError("The flux {name} already exists")
self._fluxes[name] = flux self._fluxes[name] = flux
self.flux_graphset_feed(name)
return name return name
def flux_graphset_feed(self, flux_name: str):
flux = self.flux(flux_name)
edge = flux_to_edgeonset(flux)
self._graphset.append(edge)
@property @property
def fluxes(self) -> list[str]: def fluxes(self) -> list[str]:
return list(self._fluxes) return list(self._fluxes)

View File

@ -1,21 +1,6 @@
from functools import reduce
from pydantic import BaseModel from pydantic import BaseModel
from functools import reduce
from plesna.models.graphs import Node, Edge
class Node(BaseModel):
name: str
infos: dict = {}
def __hash__(self):
return hash(self.name)
class Edge(BaseModel):
arrow_name: str
source: Node
target: Node
edge_kwrds: dict = {}
class Graph: class Graph:

View File

@ -1,20 +1,4 @@
from typing import Callable from plesna.models.graphs import EdgeOnSet
from pydantic import BaseModel
class Node(BaseModel):
name: str
infos: dict = {}
def __hash__(self):
return hash(self.name)
class EdgeOnSet(BaseModel):
arrow: str
sources: list[Node]
targets: list[Node]
class GraphSet: class GraphSet:

View File

@ -1,14 +1,48 @@
from pydantic import BaseModel from collections.abc import Callable
from pydantic import BaseModel, computed_field
from plesna.models.storage import Table from plesna.models.storage import Table
from plesna.models.transformation import Transformation
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
function: Callable
extra_kwrds: dict = {}
class Flux(BaseModel): class Flux(BaseModel):
sources: dict[str, Table] id: str
targets: dict[str, Table] name: str
sources: list[Table]
targets: list[Table]
transformation: Transformation transformation: Transformation
@computed_field
@property
def sources_dict(self) -> dict[str, Table]:
return {s.id: s for s in self.sources}
@computed_field
@property
def sources_id(self) -> dict[str, Table]:
return [s.id for s in self.sources]
@computed_field
@property
def targets_id(self) -> dict[str, Table]:
return [s.id for s in self.targets]
@computed_field
@property
def targets_dict(self) -> dict[str, Table]:
return {s.id: s for s in self.targets}
class FluxMetaData(BaseModel): class FluxMetaData(BaseModel):
data: dict data: dict

21
plesna/models/graphs.py Normal file
View File

@ -0,0 +1,21 @@
from pydantic import BaseModel
class Node(BaseModel):
name: str
def __hash__(self):
return hash(self.name)
class Edge(BaseModel):
arrow_name: str
source: Node
target: Node
edge_kwrds: dict = {}
class EdgeOnSet(BaseModel):
arrow: str
sources: list[Node]
targets: list[Node]

View File

@ -1,15 +0,0 @@
from collections.abc import Callable
from pydantic import BaseModel
class Transformation(BaseModel):
"""
The function have to have at least 2 arguments: sources and targets
Other arguments will came throught extra_kwrds
The function will have to return metadata as dict
"""
function: Callable
extra_kwrds: dict = {}

View File

@ -2,29 +2,16 @@ from pathlib import Path
from pydantic import BaseModel, computed_field from pydantic import BaseModel, computed_field
from plesna.libs.string_tools import extract_values_from_pattern
from plesna.models.storage import Partition, Schema, Table from plesna.models.storage import Partition, Schema, Table
from plesna.storage.repository.repository import Repository from plesna.storage.repository.repository import Repository
class FSPartition(BaseModel):
name: str
path: Path
@computed_field
@property
def ref(self) -> Partition:
return Partition(
id=str(self.path),
repo_id=str(self.path.parent.parent.parent),
schema_id=str(self.path.parent.parent),
table_id=str(self.path.parent),
name=self.name,
value=str(self.path.absolute()),
)
class FSTable(BaseModel): class FSTable(BaseModel):
name: str name: str
repo_id: str
schema_id: str
id: str
path: Path path: Path
is_partitionned: bool is_partitionned: bool
partitions: list[str] = [] partitions: list[str] = []
@ -38,9 +25,9 @@ class FSTable(BaseModel):
datas = [str(self.path.absolute())] datas = [str(self.path.absolute())]
return Table( return Table(
id=str(self.path), id=self.id,
repo_id=str(self.path.parent.parent), repo_id=self.repo_id,
schema_id=str(self.path.parent), schema_id=self.schema_id,
name=self.name, name=self.name,
value=str(self.path.absolute()), value=str(self.path.absolute()),
partitions=self.partitions, partitions=self.partitions,
@ -50,6 +37,8 @@ class FSTable(BaseModel):
class FSSchema(BaseModel): class FSSchema(BaseModel):
name: str name: str
repo_id: str
id: str
path: Path path: Path
tables: list[str] tables: list[str]
@ -57,14 +46,18 @@ class FSSchema(BaseModel):
@property @property
def ref(self) -> Schema: def ref(self) -> Schema:
return Schema( return Schema(
id=str(self.path), id=self.id,
repo_id=str(self.path.parent), repo_id=self.repo_id,
name=self.name, name=self.name,
value=str(self.path.absolute()), value=str(self.path.absolute()),
tables=self.tables, tables=self.tables,
) )
class FSRepositoryError(ValueError):
pass
class FSRepository(Repository): class FSRepository(Repository):
"""Repository based on files tree structure """Repository based on files tree structure
@ -74,11 +67,15 @@ class FSRepository(Repository):
""" """
def __init__(self, name: str, basepath: str, id: str): ID_FMT = {
self._basepath = Path(basepath) "schema": "{repo_id}-{schema_name}",
self.name = name "table": "{schema_id}-{table_name}",
self.id = id }
def __init__(self, id: str, name: str, basepath: str):
super().__init__(id, name)
self._basepath = Path(basepath)
assert self._basepath.exists() assert self._basepath.exists()
def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]: def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]:
@ -112,47 +109,89 @@ class FSRepository(Repository):
return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")] return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")]
def parse_id(self, string: str, id_type: str) -> dict:
if id_type not in self.ID_FMT:
raise FSRepositoryError(
"Wrong id_type. Gots {id_type} needs to be one of {self.ID_FMT.values}"
)
parsed = extract_values_from_pattern(self.ID_FMT[id_type], string)
if not parsed:
raise FSRepositoryError(
f"Wrong format for {id_type}. Got {string} need {self.ID_FMT['id_type']}"
)
return parsed
def schemas(self) -> list[str]: def schemas(self) -> list[str]:
"""List schemas (sub directories within basepath)""" """List schemas (sub directories within basepath)"""
subdirectories = self.ls("", only_directories=True) subdirectories = self.ls("", only_directories=True)
return [str(d) for d in subdirectories] return [
self.ID_FMT["schema"].format(repo_id=self.id, schema_name=d) for d in subdirectories
]
def _schema(self, name: str) -> FSSchema: def _schema(self, schema_id: str) -> FSSchema:
"""List schemas (sub directories within basepath)""" """List schemas (sub directories within basepath)"""
schema_path = self._basepath / name parsed = self.parse_id(schema_id, "schema")
tables = self.tables(schema=name)
return FSSchema(name=name, path=schema_path, tables=tables)
def schema(self, name: str) -> Schema: repo_id = parsed["repo_id"]
return self._schema(name).ref schema_name = parsed["schema_name"]
schema_path = self._basepath / schema_name
def tables(self, schema: str) -> list[str]: if repo_id != self.id:
tables = self.ls(schema) raise FSRepositoryError("Trying to get schema that don't belong in this repository")
tables = self.tables(schema_id)
return FSSchema(
name=schema_name,
id=schema_id,
repo_id=self.id,
schema_id=schema_id,
path=schema_path,
tables=tables,
)
def schema(self, schema_id: str) -> Schema:
return self._schema(schema_id).ref
def _tables(self, schema_id: str) -> list[str]:
parsed = self.parse_id(schema_id, "schema")
tables = self.ls(parsed["schema_name"])
return [self.ID_FMT["table"].format(table_name=t, schema_id=schema_id) for t in tables]
def tables(self, schema_id: str = "") -> list[str]:
if schema_id:
return self._tables(schema_id)
tables = []
for schema in self.schemas():
tables += self._tables(schema)
return tables return tables
def _table(self, schema: str, name: str) -> FSTable: def _table(self, table_id: str) -> FSTable:
"""Get infos on the table""" """Get infos on the table"""
table_path = self._basepath / schema / name parsed = self.parse_id(table_id, "table")
schema = self._schema(parsed["schema_id"])
if not schema.path.exists():
raise FSRepositoryError(f"The schema {schema.id} does not exists.")
table_subpath = f"{schema.name}/{parsed['table_name']}"
table_path = self._basepath / table_subpath
is_partitionned = table_path.is_dir() is_partitionned = table_path.is_dir()
if is_partitionned: if is_partitionned:
partitions = self.ls(f"{schema}/{name}", only_files=True) partitions = self.ls(table_subpath, only_files=True)
else: else:
partitions = [] partitions = []
return FSTable( return FSTable(
name=name, name=parsed["table_name"],
id=table_id,
repo_id=self.id,
schema_id=schema.id,
path=table_path, path=table_path,
is_partitionned=is_partitionned, is_partitionned=is_partitionned,
partitions=partitions, partitions=partitions,
) )
def table(self, schema: str, name: str) -> Table: def table(self, table_id: str) -> Table:
return self._table(schema, name).ref return self._table(table_id).ref
def _partition(self, schema: str, table: str, partition: str) -> FSPartition:
"""Get infos on the partition"""
table_path = self._basepath / schema / table
return FSPartition(name=partition, table_path=table_path)
def partition(self, schema: str, name: str) -> Partition:
return self._partition(schema, name).ref

View File

@ -4,35 +4,34 @@ from plesna.models.storage import Partition, Schema, Table
class Repository: class Repository:
def __init__(self): def __init__(self, id: str, name: str):
pass self._id = id
self._name = name
@property
def id(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@abc.abstractmethod @abc.abstractmethod
def schemas(self) -> list[str]: def schemas(self) -> list[str]:
"""List schema's names""" """List schema's ids"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def schema(self, name: str) -> Schema: def schema(self, schema_id: str) -> Schema:
"""Get the schema properties""" """Get the schema properties"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def tables(self, schema: str) -> list[str]: def tables(self, schema_id: str) -> list[str]:
"""List table's name in schema""" """List table's name in schema (the id)"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def table(self, schema: str, name: str) -> Table: def table(self, table_id: str) -> Table:
"""Get the table properties""" """Get the table properties (the id)"""
raise NotImplementedError
@abc.abstractmethod
def partitions(self, schema: str, table: str) -> list[str]:
"""List partition's name in table"""
raise NotImplementedError
@abc.abstractmethod
def partition(self, schema: str, name: str, partition: str) -> Partition:
"""Get the partition properties"""
raise NotImplementedError raise NotImplementedError

View File

@ -1,26 +1,17 @@
from plesna.compute.consume_flux import consume_flux from plesna.compute.consume_flux import consume_flux
from plesna.models.flux import Flux from plesna.models.flux import Flux, Transformation
from plesna.models.storage import Table from plesna.models.storage import Table
from plesna.models.transformation import Transformation
def test_consume_flux(): def test_consume_flux():
sources = { sources = [
"src1": Table( Table(id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
id="src1", repo_id="test", schema_id="test", name="test", value="here", datas=["d"] Table(id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"]),
), ]
"src2": Table( targets = [
id="src2", repo_id="test", schema_id="test", name="test", value="here", datas=["d"] Table(id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]),
), Table(id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]),
} ]
targets = {
"tgt1": Table(
id="tgt1", repo_id="test", schema_id="test", name="test", value="this", datas=["d"]
),
"tgt2": Table(
id="tgt2", repo_id="test", schema_id="test", name="test", value="that", datas=["d"]
),
}
def func(sources, targets, **kwrds): def func(sources, targets, **kwrds):
return { return {
@ -30,6 +21,8 @@ def test_consume_flux():
} }
flux = Flux( flux = Flux(
id="flux",
name="flux",
sources=sources, sources=sources,
targets=targets, targets=targets,
transformation=Transformation(function=func, extra_kwrds={"extra": "super"}), transformation=Transformation(function=func, extra_kwrds={"extra": "super"}),

View File

@ -4,8 +4,8 @@ from pathlib import Path
import pytest import pytest
from plesna.dataplatform import DataPlateform from plesna.dataplatform import DataPlateform
from plesna.models.flux import Flux from plesna.models.graphs import Node
from plesna.models.transformation import Transformation from plesna.models.flux import Flux, Transformation
from plesna.storage.repository.fs_repository import FSRepository from plesna.storage.repository.fs_repository import FSRepository
FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas") FIXTURE_DIR = Path(__file__).parent.parent / Path("raw_datas")
@ -24,52 +24,36 @@ def repository(tmp_path) -> FSRepository:
silver_path = Path(tmp_path) / "silver" silver_path = Path(tmp_path) / "silver"
silver_path.mkdir() silver_path.mkdir()
return FSRepository("test", tmp_path, "test") return FSRepository("test", "test", tmp_path)
def test_add_repository( def test_add_repository(
repository: FSRepository, repository: FSRepository,
): ):
dp = DataPlateform() dp = DataPlateform()
dp.add_repository("test", repository) dp.add_repository(repository)
assert dp.repositories == ["test"] assert dp.repositories == ["test"]
assert dp.repository("test") == repository assert dp.repository("test") == repository
@pytest.fixture
def foo_flux(repository: FSRepository) -> Flux:
src = {"username": repository.table("raw", "username")}
targets = {"username": repository.table("bronze", "username")}
def foo(sources, targets):
return {"who": "foo"}
extra_kwrds = {}
flux = Flux(
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
)
return flux
@pytest.fixture @pytest.fixture
def copy_flux(repository: FSRepository) -> Flux: def copy_flux(repository: FSRepository) -> Flux:
raw_username = {"username": repository.table("raw", "username")} raw_username = [repository.table("test-raw-username")]
bronze_username = {"username": repository.table("bronze", "username")} bronze_username = [repository.table("test-bronze-username")]
def copy(sources, targets): def copy(sources, targets):
src_path = Path(sources["username"].datas[0]) src_path = Path(sources["test-raw-username"].datas[0])
tgt_path = Path(targets["username"].datas[0]) tgt_path = Path(targets["test-bronze-username"].datas[0])
shutil.copy(src_path, tgt_path) shutil.copy(src_path, tgt_path)
return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size} return {"src_size": src_path.stat().st_size, "tgt_size": tgt_path.stat().st_size}
extra_kwrds = {} extra_kwrds = {}
raw_brz_copy_username = Flux( raw_brz_copy_username = Flux(
id="copy_flux",
name="copy",
sources=raw_username, sources=raw_username,
targets=bronze_username, targets=bronze_username,
transformation=Transformation(function=copy, extra_kwrds=extra_kwrds), transformation=Transformation(function=copy, extra_kwrds=extra_kwrds),
@ -77,9 +61,32 @@ def copy_flux(repository: FSRepository) -> Flux:
return raw_brz_copy_username return raw_brz_copy_username
@pytest.fixture
def foo_flux(repository: FSRepository) -> Flux:
src = [
repository.table("test-raw-username"),
repository.table("test-raw-recovery"),
]
targets = [repository.table("test-bronze-foo")]
def foo(sources, targets):
return {"who": "foo"}
extra_kwrds = {}
flux = Flux(
id="foo_flux",
name="foo",
sources=src,
targets=targets,
transformation=Transformation(function=foo, extra_kwrds=extra_kwrds),
)
return flux
def test_add_flux(repository: FSRepository, copy_flux: Flux): def test_add_flux(repository: FSRepository, copy_flux: Flux):
dataplatform = DataPlateform() dataplatform = DataPlateform()
dataplatform.add_repository("test", repository) dataplatform.add_repository(repository)
dataplatform.add_flux(name="copy_flux", flux=copy_flux) dataplatform.add_flux(name="copy_flux", flux=copy_flux)
assert dataplatform.fluxes == ["copy_flux"] assert dataplatform.fluxes == ["copy_flux"]
@ -98,7 +105,7 @@ def dataplatform(
) -> DataPlateform: ) -> DataPlateform:
dp = DataPlateform() dp = DataPlateform()
dp.add_repository("test", repository) dp.add_repository(repository)
dp.add_flux("foo", foo_flux) dp.add_flux("foo", foo_flux)
dp.add_flux("raw_brz_copy_username", copy_flux) dp.add_flux("raw_brz_copy_username", copy_flux)
@ -106,27 +113,59 @@ def dataplatform(
def test_listing_content(dataplatform: DataPlateform): def test_listing_content(dataplatform: DataPlateform):
assert dataplatform.repository("test").schemas() == ["raw", "bronze", "silver"] assert dataplatform.repository("test").schemas() == ["test-raw", "test-bronze", "test-silver"]
assert dataplatform.repository("test").schema("raw").tables == [ assert dataplatform.repository("test").schema("test-raw").tables == [
"username", "test-raw-username",
"recovery", "test-raw-recovery",
"salary", "test-raw-salary",
] ]
assert dataplatform.repository("test").table("raw", "username").partitions == ["username.csv"] assert dataplatform.repository("test").table("test-raw-username").partitions == ["username.csv"]
assert dataplatform.repository("test").table("raw", "recovery").partitions == [ assert dataplatform.repository("test").table("test-raw-recovery").partitions == [
"2022.csv", "2022.csv",
"2023.csv", "2023.csv",
"2024.csv", "2024.csv",
] ]
def test_content_from_graph(dataplatform: DataPlateform):
assert dataplatform.graph.nodes == {
Node(name="test-raw-recovery", infos={}),
Node(name="test-raw-salary", infos={}),
Node(name="test-raw-username", infos={}),
}
assert dataplatform.graphset.node_sets == {
frozenset(
{
Node(name="test-bronze-username"),
}
),
frozenset(
{
Node(name="test-bronze-foo"),
}
),
frozenset(
{
Node(name="test-raw-username"),
}
),
frozenset(
{
Node(name="test-raw-username"),
Node(name="test-raw-recovery"),
}
),
}
def test_execute_flux(dataplatform: DataPlateform): def test_execute_flux(dataplatform: DataPlateform):
meta = dataplatform.execute_flux("foo") meta = dataplatform.execute_flux("foo")
assert meta.data == {"who": "foo"} assert meta.data == {"who": "foo"}
assert dataplatform.repository("test").schema("bronze").tables == [] assert dataplatform.repository("test").schema("test-bronze").tables == []
meta = dataplatform.execute_flux("raw_brz_copy_username") meta = dataplatform.execute_flux("raw_brz_copy_username")
assert meta.data == {"src_size": 283, "tgt_size": 283} assert meta.data == {"src_size": 283, "tgt_size": 283}
assert dataplatform.repository("test").schema("bronze").tables == ["username"] assert dataplatform.repository("test").schema("test-bronze").tables == ["test-bronze-username"]

View File

@ -1,6 +1,7 @@
import pytest import pytest
from plesna.graph.graph import Edge, Graph, Node from plesna.graph.graph import Graph
from plesna.models.graphs import Edge, Node
def test_append_nodess(): def test_append_nodess():
@ -94,9 +95,7 @@ def test_get_sources_from(nodes, dag_edges):
assert graph.get_direct_sources_from(nodes["C"]) == set([nodes["A"], nodes["B"]]) assert graph.get_direct_sources_from(nodes["C"]) == set([nodes["A"], nodes["B"]])
assert graph.get_direct_sources_from(nodes["D"]) == set([nodes["C"]]) assert graph.get_direct_sources_from(nodes["D"]) == set([nodes["C"]])
assert graph.get_sources_from(nodes["D"]) == set( assert graph.get_sources_from(nodes["D"]) == set([nodes["A"], nodes["B"], nodes["C"]])
[nodes["A"], nodes["B"], nodes["C"]]
)
def test_valid_dage(dag_edges, notdag_edges): def test_valid_dage(dag_edges, notdag_edges):

View File

@ -1,4 +1,5 @@
from plesna.graph.graph_set import EdgeOnSet, GraphSet, Node from plesna.graph.graph_set import GraphSet
from plesna.models.graphs import EdgeOnSet, Node
def test_init(): def test_init():

View File

@ -20,7 +20,7 @@ def location(tmp_path):
def test_init(location): def test_init(location):
repo = FSRepository("example", location, "example") repo = FSRepository("example", "example", location)
assert repo.ls() == [ assert repo.ls() == [
"schema", "schema",
] ]
@ -45,29 +45,50 @@ def test_init(location):
@pytest.fixture @pytest.fixture
def repository(location) -> FSRepository: def repository(location) -> FSRepository:
return FSRepository("example", location, "example") return FSRepository("repo_id", "example", location)
def test_list_schema(location, repository): def test_list_schemas(repository):
assert repository.schemas() == ["schema"] assert repository.schemas() == ["repo_id-schema"]
assert repository.schema("schema").name == "schema"
assert repository.schema("schema").id == str(location / "schema")
assert repository.schema("schema").repo_id == str(location) def test_describe_schema(location, repository):
assert repository.schema("schema").value == str(location / "schema") schema = repository.schema("repo_id-schema")
assert repository.schema("schema").tables == ["username", "recovery", "salary"] assert schema.name == "schema"
assert schema.id == "repo_id-schema"
assert schema.repo_id == "repo_id"
assert schema.value == str(location / "schema")
assert schema.tables == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
def test_list_tables_schema(repository): def test_list_tables_schema(repository):
assert repository.schema("schema").tables == ["username", "recovery", "salary"] assert repository.schema("repo_id-schema").tables == [
assert repository.tables(schema="schema") == ["username", "recovery", "salary"] "repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
assert repository.tables("repo_id-schema") == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
assert repository.tables() == [
"repo_id-schema-username",
"repo_id-schema-recovery",
"repo_id-schema-salary",
]
def test_describe_table(location, repository): def test_describe_table(location, repository):
table = repository.table("schema", "username") table = repository.table("repo_id-schema-username")
assert table.id == str(location / "schema" / "username") assert table.id == "repo_id-schema-username"
assert table.repo_id == str(location) assert table.repo_id == "repo_id"
assert table.schema_id == str(location / "schema") assert table.schema_id == "repo_id-schema"
assert table.name == "username" assert table.name == "username"
assert table.value == str(location / "schema" / "username") assert table.value == str(location / "schema" / "username")
assert table.partitions == ["username.csv"] assert table.partitions == ["username.csv"]
@ -75,11 +96,11 @@ def test_describe_table(location, repository):
def test_describe_table_with_partitions(location, repository): def test_describe_table_with_partitions(location, repository):
table = repository.table("schema", "recovery") table = repository.table("repo_id-schema-recovery")
assert table.id == str(location / "schema" / "recovery") assert table.id == "repo_id-schema-recovery"
assert table.repo_id == str(location) assert table.repo_id == "repo_id"
assert table.schema_id == str(location / "schema") assert table.schema_id == "repo_id-schema"
assert table.name == "recovery" assert table.name == "recovery"
assert table.value == str(location / "schema" / "recovery") assert table.value == str(location / "schema" / "recovery")
assert table.partitions == [ assert table.partitions == [