Feat: use id in repository

This commit is contained in:
2025-01-05 11:27:52 +01:00
parent b9dade2701
commit 48964ad561
3 changed files with 97 additions and 54 deletions

View File

@@ -2,6 +2,7 @@ from pathlib import Path
from pydantic import BaseModel, computed_field from pydantic import BaseModel, computed_field
from plesna.libs.string_tools import extract_values_from_pattern
from plesna.models.storage import Partition, Schema, Table from plesna.models.storage import Partition, Schema, Table
from plesna.storage.repository.repository import Repository from plesna.storage.repository.repository import Repository
@@ -25,6 +26,7 @@ class FSPartition(BaseModel):
class FSTable(BaseModel): class FSTable(BaseModel):
name: str name: str
id: str
path: Path path: Path
is_partitionned: bool is_partitionned: bool
partitions: list[str] = [] partitions: list[str] = []
@@ -38,7 +40,7 @@ class FSTable(BaseModel):
datas = [str(self.path.absolute())] datas = [str(self.path.absolute())]
return Table( return Table(
id=str(self.path), id=self.id,
repo_id=str(self.path.parent.parent), repo_id=str(self.path.parent.parent),
schema_id=str(self.path.parent), schema_id=str(self.path.parent),
name=self.name, name=self.name,
@@ -50,6 +52,7 @@ class FSTable(BaseModel):
class FSSchema(BaseModel): class FSSchema(BaseModel):
name: str name: str
id: str
path: Path path: Path
tables: list[str] tables: list[str]
@@ -57,7 +60,7 @@ class FSSchema(BaseModel):
@property @property
def ref(self) -> Schema: def ref(self) -> Schema:
return Schema( return Schema(
id=str(self.path), id=self.id,
repo_id=str(self.path.parent), repo_id=str(self.path.parent),
name=self.name, name=self.name,
value=str(self.path.absolute()), value=str(self.path.absolute()),
@@ -65,6 +68,10 @@ class FSSchema(BaseModel):
) )
class FSRepositoryError(ValueError):
pass
class FSRepository(Repository): class FSRepository(Repository):
"""Repository based on files tree structure """Repository based on files tree structure
@@ -74,6 +81,11 @@ class FSRepository(Repository):
""" """
ID_FMT = {
"schema": "{repo_name}-{schema_name}",
"table": "{repo_name}-{schema_name}-{table_name}",
}
def __init__(self, name: str, basepath: str, id: str): def __init__(self, name: str, basepath: str, id: str):
self._basepath = Path(basepath) self._basepath = Path(basepath)
self.name = name self.name = name
@@ -112,47 +124,83 @@ class FSRepository(Repository):
return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")] return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")]
def parse_id(self, string: str, id_type: str) -> dict:
if id_type not in self.ID_FMT:
raise FSRepositoryError(
"Wrong id_type. Gots {id_type} needs to be one of {self.ID_FMT.values}"
)
parsed = extract_values_from_pattern(self.ID_FMT[id_type], string)
if not parsed:
raise FSRepositoryError(
f"Wrong format for {id_type}. Got {string} need {self.ID_FMT['id_type']}"
)
return parsed
def schemas(self) -> list[str]: def schemas(self) -> list[str]:
"""List schemas (sub directories within basepath)""" """List schemas (sub directories within basepath)"""
subdirectories = self.ls("", only_directories=True) subdirectories = self.ls("", only_directories=True)
return [str(d) for d in subdirectories] return [
self.ID_FMT["schema"].format(repo_name=self.name, schema_name=d) for d in subdirectories
]
def _schema(self, name: str) -> FSSchema: def _schema(self, schema_id: str) -> FSSchema:
"""List schemas (sub directories within basepath)""" """List schemas (sub directories within basepath)"""
schema_path = self._basepath / name parsed = self.parse_id(schema_id, "schema")
tables = self.tables(schema=name)
return FSSchema(name=name, path=schema_path, tables=tables)
def schema(self, name: str) -> Schema: repo_name = parsed["repo_name"]
return self._schema(name).ref schema_name = parsed["schema_name"]
def tables(self, schema: str) -> list[str]: if repo_name != self.name:
tables = self.ls(schema) raise FSRepositoryError("Trying to get schema that don't belong in this repository")
schema_path = self._basepath / schema_name
if not schema_path.exists():
raise FSRepositoryError(f"The schema {schema_name} does not exists")
tables = self.tables(schema_id)
return FSSchema(name=schema_name, id=schema_id, path=schema_path, tables=tables)
def schema(self, schema_id: str) -> Schema:
return self._schema(schema_id).ref
def _tables(self, schema_id: str) -> list[str]:
parsed = self.parse_id(schema_id, "schema")
tables = self.ls(parsed["schema_name"])
return tables return tables
def _table(self, schema: str, name: str) -> FSTable: def tables(self, schema_id: str = "") -> list[str]:
if schema_id:
return self._tables(schema_id)
tables = []
for schema in self.schemas():
tables += self._tables(schema)
return tables
def _table(self, table_id: str) -> FSTable:
"""Get infos on the table""" """Get infos on the table"""
table_path = self._basepath / schema / name parsed = self.parse_id(table_id, "table")
if parsed["repo_name"] != self.name:
raise FSRepositoryError("Trying to get table that don't belong in this repository")
table_subpath = f"{parsed['schema_name']}/{parsed['table_name']}"
table_path = self._basepath / table_subpath
if not table_path.exists():
raise FSRepositoryError(f"The table {parsed['table_name']} does not exists")
is_partitionned = table_path.is_dir() is_partitionned = table_path.is_dir()
if is_partitionned: if is_partitionned:
partitions = self.ls(f"{schema}/{name}", only_files=True) partitions = self.ls(table_subpath, only_files=True)
else: else:
partitions = [] partitions = []
return FSTable( return FSTable(
name=name, name=parsed["table_name"],
id=table_id,
path=table_path, path=table_path,
is_partitionned=is_partitionned, is_partitionned=is_partitionned,
partitions=partitions, partitions=partitions,
) )
def table(self, schema: str, name: str) -> Table: def table(self, table_id: str) -> Table:
return self._table(schema, name).ref return self._table(table_id).ref
def _partition(self, schema: str, table: str, partition: str) -> FSPartition:
"""Get infos on the partition"""
table_path = self._basepath / schema / table
return FSPartition(name=partition, table_path=table_path)
def partition(self, schema: str, name: str) -> Partition:
return self._partition(schema, name).ref

View File

@@ -9,30 +9,20 @@ class Repository:
@abc.abstractmethod @abc.abstractmethod
def schemas(self) -> list[str]: def schemas(self) -> list[str]:
"""List schema's names""" """List schema's ids"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def schema(self, name: str) -> Schema: def schema(self, schema_id: str) -> Schema:
"""Get the schema properties""" """Get the schema properties"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def tables(self, schema: str) -> list[str]: def tables(self, schema_id: str) -> list[str]:
"""List table's name in schema""" """List table's name in schema (the id)"""
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def table(self, schema: str, name: str) -> Table: def table(self, table_id) -> Table:
"""Get the table properties""" """Get the table properties (the id)"""
raise NotImplementedError
@abc.abstractmethod
def partitions(self, schema: str, table: str) -> list[str]:
"""List partition's name in table"""
raise NotImplementedError
@abc.abstractmethod
def partition(self, schema: str, name: str, partition: str) -> Partition:
"""Get the partition properties"""
raise NotImplementedError raise NotImplementedError

View File

@@ -48,24 +48,29 @@ def repository(location) -> FSRepository:
return FSRepository("example", location, "example") return FSRepository("example", location, "example")
def test_list_schema(location, repository): def test_list_schemas(repository):
assert repository.schemas() == ["schema"] assert repository.schemas() == ["example-schema"]
assert repository.schema("schema").name == "schema"
assert repository.schema("schema").id == str(location / "schema")
assert repository.schema("schema").repo_id == str(location) def test_describe_schema(location, repository):
assert repository.schema("schema").value == str(location / "schema") schema = repository.schema("example-schema")
assert repository.schema("schema").tables == ["username", "recovery", "salary"] assert schema.name == "schema"
assert schema.id == "example-schema"
assert schema.repo_id == str(location)
assert schema.value == str(location / "schema")
assert schema.tables == ["username", "recovery", "salary"]
def test_list_tables_schema(repository): def test_list_tables_schema(repository):
assert repository.schema("schema").tables == ["username", "recovery", "salary"] assert repository.schema("example-schema").tables == ["username", "recovery", "salary"]
assert repository.tables(schema="schema") == ["username", "recovery", "salary"] assert repository.tables("example-schema") == ["username", "recovery", "salary"]
assert repository.tables() == ["username", "recovery", "salary"]
def test_describe_table(location, repository): def test_describe_table(location, repository):
table = repository.table("schema", "username") table = repository.table("example-schema-username")
assert table.id == str(location / "schema" / "username") assert table.id == "example-schema-username"
assert table.repo_id == str(location) assert table.repo_id == str(location)
assert table.schema_id == str(location / "schema") assert table.schema_id == str(location / "schema")
assert table.name == "username" assert table.name == "username"
@@ -75,9 +80,9 @@ def test_describe_table(location, repository):
def test_describe_table_with_partitions(location, repository): def test_describe_table_with_partitions(location, repository):
table = repository.table("schema", "recovery") table = repository.table("example-schema-recovery")
assert table.id == str(location / "schema" / "recovery") assert table.id == "example-schema-recovery"
assert table.repo_id == str(location) assert table.repo_id == str(location)
assert table.schema_id == str(location / "schema") assert table.schema_id == str(location / "schema")
assert table.name == "recovery" assert table.name == "recovery"