153 lines
4.3 KiB
Python
153 lines
4.3 KiB
Python
from pathlib import Path
|
|
|
|
from pydantic import BaseModel, computed_field
|
|
|
|
from plesna.models.storage import Partition, Schema, Table
|
|
from plesna.storage.repository.repository import Repository
|
|
|
|
|
|
class FSPartition(BaseModel):
|
|
name: str
|
|
path: Path
|
|
|
|
@computed_field
|
|
@property
|
|
def ref(self) -> Partition:
|
|
return Partition(
|
|
id=str(self.path),
|
|
repo_id=str(self.path.parent.parent.parent),
|
|
schema_id=str(self.path.parent.parent),
|
|
table_id=str(self.path.parent),
|
|
name=self.name,
|
|
value=str(self.path.absolute()),
|
|
)
|
|
|
|
|
|
class FSTable(BaseModel):
|
|
name: str
|
|
path: Path
|
|
is_partitionned: bool
|
|
partitions: list[str] = []
|
|
|
|
@computed_field
|
|
@property
|
|
def ref(self) -> Table:
|
|
return Table(
|
|
id=str(self.path),
|
|
repo_id=str(self.path.parent.parent),
|
|
schema_id=str(self.path.parent),
|
|
name=self.name,
|
|
value=str(self.path.absolute()),
|
|
partitions=self.partitions,
|
|
)
|
|
|
|
|
|
class FSSchema(BaseModel):
|
|
name: str
|
|
path: Path
|
|
tables: list[str]
|
|
|
|
@computed_field
|
|
@property
|
|
def ref(self) -> Schema:
|
|
return Schema(
|
|
id=str(self.path),
|
|
repo_id=str(self.path.parent),
|
|
name=self.name,
|
|
value=str(self.path.absolute()),
|
|
tables=self.tables,
|
|
)
|
|
|
|
|
|
class FSRepository(Repository):
|
|
"""Repository based on files tree structure
|
|
|
|
- first level: schemas
|
|
- second level: tables
|
|
- third level: partition (actual datas)
|
|
|
|
"""
|
|
|
|
def __init__(self, name: str, basepath: str, id: str):
|
|
self._basepath = Path(basepath)
|
|
self.name = name
|
|
self.id = id
|
|
|
|
assert self._basepath.exists()
|
|
|
|
def ls(
|
|
self, dir="", only_files=False, only_directories=False, recursive=False
|
|
) -> list[str]:
|
|
"""List files in dir
|
|
|
|
:param dir: relative path from self._basepath
|
|
:param only_files: if true return only files
|
|
:param only_directories: if true return only directories
|
|
:param recursive: list content recursively (only for)
|
|
:return: list of string describing path from self._basepath / dir
|
|
"""
|
|
dirpath = self._basepath / dir
|
|
|
|
if recursive:
|
|
paths = dirpath.rglob("*")
|
|
else:
|
|
paths = dirpath.iterdir()
|
|
|
|
if only_files:
|
|
return [
|
|
str(f.relative_to(dirpath))
|
|
for f in paths
|
|
if not f.is_dir() and not str(f).startswith(".")
|
|
]
|
|
if only_directories:
|
|
return [
|
|
str(f.relative_to(dirpath))
|
|
for f in paths
|
|
if f.is_dir() and not str(f).startswith(".")
|
|
]
|
|
|
|
return [
|
|
str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")
|
|
]
|
|
|
|
def schemas(self) -> list[str]:
|
|
"""List schemas (sub directories within basepath)"""
|
|
subdirectories = self.ls("", only_directories=True)
|
|
return [str(d) for d in subdirectories]
|
|
|
|
def _schema(self, name: str) -> FSSchema:
|
|
"""List schemas (sub directories within basepath)"""
|
|
schema_path = self._basepath / name
|
|
tables = self.ls(name)
|
|
return FSSchema(name=name, path=schema_path, tables=tables)
|
|
|
|
def schema(self, name: str) -> Schema:
|
|
return self._schema(name).ref
|
|
|
|
def _table(self, schema: str, name: str) -> FSTable:
|
|
"""Get infos on the table"""
|
|
table_path = self._basepath / schema / name
|
|
is_partitionned = table_path.is_dir()
|
|
if is_partitionned:
|
|
partitions = self.ls(f"{schema}/{name}", only_files=True)
|
|
else:
|
|
partitions = []
|
|
|
|
return FSTable(
|
|
name=name,
|
|
path=table_path,
|
|
is_partitionned=is_partitionned,
|
|
partitions=partitions,
|
|
)
|
|
|
|
def table(self, schema: str, name: str) -> Table:
|
|
return self._table(schema, name).ref
|
|
|
|
def _partition(self, schema: str, table: str, partition: str) -> FSPartition:
|
|
"""Get infos on the partition"""
|
|
table_path = self._basepath / schema / table
|
|
return FSPartition(name=partition, table_path=table_path)
|
|
|
|
def partition(self, schema: str, name: str) -> Partition:
|
|
return self._partition(schema, name).ref
|