refact: rename stage to repository
This commit is contained in:
64
dashboard/libs/repository/fs_repository.py
Normal file
64
dashboard/libs/repository/fs_repository.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from .repository import AbstractRepository
|
||||
|
||||
|
||||
class FSRepository(AbstractRepository):
|
||||
def __init__(self, name, basepath, metadata_engine=None):
|
||||
self.name = name
|
||||
|
||||
self.basepath = Path(basepath)
|
||||
self._metadata_engine = metadata_engine
|
||||
|
||||
def ls(self, dir, only_files=False, only_directories=False, recursive=False) -> list[str]:
|
||||
dirpath = Path(dir)
|
||||
|
||||
if only_files:
|
||||
return [str(f.relative_to(dirpath)) for f in dirpath.iterdir() if not f.is_dir()]
|
||||
|
||||
if only_directories:
|
||||
if recursive:
|
||||
return [str(f[0].relative_to(dirpath)) for f in dirpath.walk()]
|
||||
|
||||
return [str(f.relative_to(dirpath)) for f in dirpath.iterdir() if f.is_dir()]
|
||||
|
||||
return [str(f.relative_to(dirpath)) for f in dirpath.iterdir()]
|
||||
|
||||
def schemas(self, recursive=True) -> list[str]:
|
||||
dirpath = self.basepath
|
||||
return self.ls(dirpath, only_directories=True, recursive=True)
|
||||
|
||||
def tables(self, schema:str) -> list[str]:
|
||||
dirpath = self.basepath / schema
|
||||
return self.ls(dirpath, only_files=True)
|
||||
|
||||
def build_table_path(self, table:str, schema:str):
|
||||
table_path = self.basepath
|
||||
if schema == '.':
|
||||
return table_path / table
|
||||
return table_path / schema / table
|
||||
|
||||
def info(self, table:str, schema:str='.'):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
||||
|
||||
def read(self, table:str, schema:str='.', read_options={}):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
extension = table_path.suffix
|
||||
if extension == '.csv':
|
||||
return pd.read_csv(table_path, **read_options)
|
||||
|
||||
if extension == '.xlsx':
|
||||
return pd.read_excel(table_path, **read_options)
|
||||
|
||||
raise ValueError("Can't open the table")
|
||||
|
||||
def write(self, table:str, content, schema:str='.'):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
||||
|
||||
def delete(self, table:str, schema:str='.'):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
||||
Reference in New Issue
Block a user