plesna/plesna/storage/repository/fs_repository.py

159 lines
4.6 KiB
Python

from pathlib import Path
from pydantic import BaseModel, computed_field
from plesna.models.storage import Partition, Schema, Table
from plesna.storage.repository.repository import Repository
class FSPartition(BaseModel):
name: str
path: Path
@computed_field
@property
def ref(self) -> Partition:
return Partition(
id=str(self.path),
repo_id=str(self.path.parent.parent.parent),
schema_id=str(self.path.parent.parent),
table_id=str(self.path.parent),
name=self.name,
value=str(self.path.absolute()),
)
class FSTable(BaseModel):
name: str
path: Path
is_partitionned: bool
partitions: list[str] = []
@computed_field
@property
def ref(self) -> Table:
if self.is_partitionned:
datas = [str(self.path.absolute() / p) for p in self.partitions]
else:
datas = [str(self.path.absolute())]
return Table(
id=str(self.path),
repo_id=str(self.path.parent.parent),
schema_id=str(self.path.parent),
name=self.name,
value=str(self.path.absolute()),
partitions=self.partitions,
datas=datas,
)
class FSSchema(BaseModel):
name: str
path: Path
tables: list[str]
@computed_field
@property
def ref(self) -> Schema:
return Schema(
id=str(self.path),
repo_id=str(self.path.parent),
name=self.name,
value=str(self.path.absolute()),
tables=self.tables,
)
class FSRepository(Repository):
"""Repository based on files tree structure
- first level: schemas
- second level: tables
- third level: partition (actual datas)
"""
def __init__(self, name: str, basepath: str, id: str):
self._basepath = Path(basepath)
self.name = name
self.id = id
assert self._basepath.exists()
def ls(self, dir="", only_files=False, only_directories=False, recursive=False) -> list[str]:
"""List files in dir
:param dir: relative path from self._basepath
:param only_files: if true return only files
:param only_directories: if true return only directories
:param recursive: list content recursively (only for)
:return: list of string describing path from self._basepath / dir
"""
dirpath = self._basepath / dir
if recursive:
paths = dirpath.rglob("*")
else:
paths = dirpath.iterdir()
if only_files:
return [
str(f.relative_to(dirpath))
for f in paths
if not f.is_dir() and not str(f).startswith(".")
]
if only_directories:
return [
str(f.relative_to(dirpath))
for f in paths
if f.is_dir() and not str(f).startswith(".")
]
return [str(f.relative_to(dirpath)) for f in paths if not str(f).startswith(".")]
def schemas(self) -> list[str]:
"""List schemas (sub directories within basepath)"""
subdirectories = self.ls("", only_directories=True)
return [str(d) for d in subdirectories]
def _schema(self, name: str) -> FSSchema:
"""List schemas (sub directories within basepath)"""
schema_path = self._basepath / name
tables = self.tables(schema=name)
return FSSchema(name=name, path=schema_path, tables=tables)
def schema(self, name: str) -> Schema:
return self._schema(name).ref
def tables(self, schema: str) -> list[str]:
tables = self.ls(schema)
return tables
def _table(self, schema: str, name: str) -> FSTable:
"""Get infos on the table"""
table_path = self._basepath / schema / name
is_partitionned = table_path.is_dir()
if is_partitionned:
partitions = self.ls(f"{schema}/{name}", only_files=True)
else:
partitions = []
return FSTable(
name=name,
path=table_path,
is_partitionned=is_partitionned,
partitions=partitions,
)
def table(self, schema: str, name: str) -> Table:
return self._table(schema, name).ref
def _partition(self, schema: str, table: str, partition: str) -> FSPartition:
"""Get infos on the partition"""
table_path = self._basepath / schema / table
return FSPartition(name=partition, table_path=table_path)
def partition(self, schema: str, name: str) -> Partition:
return self._partition(schema, name).ref