Compare commits
10 Commits
dbt-dlt
...
2de0e5ef5c
| Author | SHA1 | Date | |
|---|---|---|---|
| 2de0e5ef5c | |||
| 7fb7bc6f5c | |||
| 612df0a8eb | |||
| 74882ae572 | |||
| d8f2fb52e1 | |||
| f9bfb917bd | |||
| cdad13788a | |||
| 29c82ae597 | |||
| 5b53630688 | |||
| ed6d1c87d1 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -159,5 +159,3 @@ cython_debug/
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
||||
*.duckdb
|
||||
|
||||
64
dashboard/app.py
Normal file
64
dashboard/app.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import dash
|
||||
from dash import Dash, dcc, html
|
||||
|
||||
from .datalake import stages
|
||||
from .pages import config, home, repository, schema, table
|
||||
|
||||
external_scripts = [{"src": "https://cdn.tailwindcss.com"}]
|
||||
# external_script = ["https://tailwindcss.com/", {"src": "https://cdn.tailwindcss.com"}]
|
||||
|
||||
app = Dash(
|
||||
__name__,
|
||||
use_pages=True,
|
||||
external_scripts=external_scripts,
|
||||
suppress_callback_exceptions=True,
|
||||
)
|
||||
app.scripts.config.serve_locally = True
|
||||
dash.register_page(
|
||||
home.__name__,
|
||||
path="/",
|
||||
layout=home.layout,
|
||||
)
|
||||
dash.register_page(config.__name__, path="/config", layout=config.layout)
|
||||
dash.register_page(
|
||||
repository.__name__,
|
||||
path_template="/repository/<repository_name>",
|
||||
layout=repository.layout_factory(stages),
|
||||
)
|
||||
dash.register_page(
|
||||
schema.__name__,
|
||||
path_template="/stg/<repository_name>/schema/<schema_name>",
|
||||
layout=schema.layout_factory(stages),
|
||||
)
|
||||
dash.register_page(
|
||||
table.__name__,
|
||||
path_template="/stg/<repository_name>/schm/<schema_name>/table/<table_name>",
|
||||
layout=table.layout_factory(stages),
|
||||
)
|
||||
table.callback_factory(app)
|
||||
|
||||
app.layout = html.Div(
|
||||
[
|
||||
html.Div(
|
||||
[
|
||||
dcc.Link(
|
||||
html.H1(
|
||||
"Plesna",
|
||||
),
|
||||
href="/",
|
||||
className="text-4xl p-4 text-center grow align-baseline",
|
||||
),
|
||||
dcc.Link(
|
||||
"Config",
|
||||
href="/config",
|
||||
className="flex-none hover:bg-amber-100 p-4 align-middle",
|
||||
),
|
||||
],
|
||||
className="bg-amber-300 flex flex-row shadow",
|
||||
),
|
||||
dash.page_container,
|
||||
]
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(debug=True)
|
||||
0
dashboard/components/__init__.py
Normal file
0
dashboard/components/__init__.py
Normal file
57
dashboard/components/lists.py
Normal file
57
dashboard/components/lists.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from dash import dcc, html
|
||||
|
||||
from ..libs.repository.repository import AbstractRepository
|
||||
|
||||
|
||||
def html_list_schema(stage:AbstractRepository, with_tables=True):
|
||||
""" Build html list of schema in stage """
|
||||
ul_classes = "ml-2"
|
||||
schema_baseurl = f"/stg/{stage.name}/schema/"
|
||||
if with_tables:
|
||||
return html.Ul(
|
||||
[
|
||||
html.Li(
|
||||
children = [
|
||||
dcc.Link(
|
||||
schema,
|
||||
href=schema_baseurl + schema,
|
||||
className="text-lg hover:underline"
|
||||
),
|
||||
html_list_table(stage, schema)
|
||||
],
|
||||
className=""
|
||||
) for schema in stage.schemas()
|
||||
],
|
||||
className=ul_classes
|
||||
)
|
||||
return html.Ul(
|
||||
[
|
||||
html.Li(
|
||||
dcc.Link(
|
||||
schema,
|
||||
href=schema_baseurl + schema,
|
||||
className="text-lg hover:underline"
|
||||
),
|
||||
) for schema in stage.schemas()
|
||||
],
|
||||
className=ul_classes
|
||||
)
|
||||
|
||||
|
||||
def html_list_table(stage:AbstractRepository, schema:str):
|
||||
""" Build html list of table in stage """
|
||||
table_baseurl = f"/stg/{stage.name}/schm/{schema}/table/"
|
||||
return html.Ul(
|
||||
[
|
||||
html.Li(
|
||||
dcc.Link(
|
||||
table,
|
||||
href=table_baseurl + table,
|
||||
className="hover:underline"
|
||||
),
|
||||
) for table in stage.tables(schema=schema)
|
||||
],
|
||||
className="ml-4"
|
||||
)
|
||||
|
||||
|
||||
14
dashboard/datalake.py
Normal file
14
dashboard/datalake.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from dotenv import dotenv_values
|
||||
|
||||
from .libs.repository.fs_repository import FSRepository
|
||||
|
||||
env = {
|
||||
**dotenv_values(".env"),
|
||||
}
|
||||
|
||||
stages = {
|
||||
"raw": FSRepository("raw", f"{env['DATA_PATH']}/{env['RAW_SUBPATH']}"),
|
||||
"staging": FSRepository("staging", f"{env['DATA_PATH']}/{env['STAGING_SUBPATH']}"),
|
||||
"gold": FSRepository("gold", f"{env['DATA_PATH']}/{env['GOLD_SUBPATH']}"),
|
||||
"mart": FSRepository("mart", f"{env['DATA_PATH']}/{env['MART_SUBPATH']}"),
|
||||
}
|
||||
0
dashboard/libs/__init__.py
Normal file
0
dashboard/libs/__init__.py
Normal file
35
dashboard/libs/fs_schema.py
Normal file
35
dashboard/libs/fs_schema.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from .schema import AbstractSchema
|
||||
from pathlib import Path
|
||||
|
||||
class FSSchema(AbstractSchema):
|
||||
def __init__(self, basepath, metadata_engine=None):
|
||||
self.basepath = basepath
|
||||
self._metadata_engine = metadata_engine
|
||||
|
||||
def ls(self, dir, only_files=True):
|
||||
dirpath = Path(dir)
|
||||
if only_files:
|
||||
return [f for f in dirpath.iterdir() if f.is_dir()]
|
||||
return [f for f in dirpath.iterdir()]
|
||||
|
||||
def tables(self, dir, only_files=True):
|
||||
dirpath = Path(dir)
|
||||
if only_files:
|
||||
return [f for f in dirpath.iterdir() if f.is_dir()]
|
||||
return [f for f in dirpath.iterdir()]
|
||||
|
||||
def info(self, path):
|
||||
path = Path(path)
|
||||
pass
|
||||
|
||||
def read(self, path):
|
||||
path = Path(path)
|
||||
pass
|
||||
|
||||
def write(self, path, content):
|
||||
path = Path(path)
|
||||
pass
|
||||
|
||||
def delete(self, path):
|
||||
path = Path(path)
|
||||
pass
|
||||
64
dashboard/libs/repository/fs_repository.py
Normal file
64
dashboard/libs/repository/fs_repository.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from .repository import AbstractRepository
|
||||
|
||||
|
||||
class FSRepository(AbstractRepository):
|
||||
def __init__(self, name, basepath, metadata_engine=None):
|
||||
self.name = name
|
||||
|
||||
self.basepath = Path(basepath)
|
||||
self._metadata_engine = metadata_engine
|
||||
|
||||
def ls(self, dir, only_files=False, only_directories=False, recursive=False) -> list[str]:
|
||||
dirpath = Path(dir)
|
||||
|
||||
if only_files:
|
||||
return [str(f.relative_to(dirpath)) for f in dirpath.iterdir() if not f.is_dir()]
|
||||
|
||||
if only_directories:
|
||||
if recursive:
|
||||
return [str(f[0].relative_to(dirpath)) for f in dirpath.walk()]
|
||||
|
||||
return [str(f.relative_to(dirpath)) for f in dirpath.iterdir() if f.is_dir()]
|
||||
|
||||
return [str(f.relative_to(dirpath)) for f in dirpath.iterdir()]
|
||||
|
||||
def schemas(self, recursive=True) -> list[str]:
|
||||
dirpath = self.basepath
|
||||
return self.ls(dirpath, only_directories=True, recursive=True)
|
||||
|
||||
def tables(self, schema:str) -> list[str]:
|
||||
dirpath = self.basepath / schema
|
||||
return self.ls(dirpath, only_files=True)
|
||||
|
||||
def build_table_path(self, table:str, schema:str):
|
||||
table_path = self.basepath
|
||||
if schema == '.':
|
||||
return table_path / table
|
||||
return table_path / schema / table
|
||||
|
||||
def info(self, table:str, schema:str='.'):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
||||
|
||||
def read(self, table:str, schema:str='.', read_options={}):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
extension = table_path.suffix
|
||||
if extension == '.csv':
|
||||
return pd.read_csv(table_path, **read_options)
|
||||
|
||||
if extension == '.xlsx':
|
||||
return pd.read_excel(table_path, **read_options)
|
||||
|
||||
raise ValueError("Can't open the table")
|
||||
|
||||
def write(self, table:str, content, schema:str='.'):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
||||
|
||||
def delete(self, table:str, schema:str='.'):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
||||
5
dashboard/libs/repository/metadata.py
Normal file
5
dashboard/libs/repository/metadata.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from abc import ABC
|
||||
|
||||
|
||||
class AbstractMetadataEngine(ABC):
|
||||
pass
|
||||
36
dashboard/libs/repository/repository.py
Normal file
36
dashboard/libs/repository/repository.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import abc
|
||||
from .metadata import AbstractMetadataEngine
|
||||
|
||||
|
||||
class AbstractRepository(abc.ABC):
|
||||
metadata_engine = AbstractMetadataEngine
|
||||
|
||||
@abc.abstractmethod
|
||||
def schemas():
|
||||
""" List schemas """
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def tables(schema):
|
||||
""" List table in schema"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def info(self, path):
|
||||
""" Get infos about a file"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def read(self, path):
|
||||
""" Get content of a file"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def write(self, path, content):
|
||||
""" Write content into the file"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete(self, path):
|
||||
""" Delete the file """
|
||||
raise NotImplementedError
|
||||
0
dashboard/pages/__init__.py
Normal file
0
dashboard/pages/__init__.py
Normal file
14
dashboard/pages/config.py
Normal file
14
dashboard/pages/config.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from dash import html
|
||||
from dotenv import dotenv_values
|
||||
import os
|
||||
|
||||
env = {
|
||||
**dotenv_values(".env"),
|
||||
**os.environ,
|
||||
}
|
||||
|
||||
|
||||
layout = html.Div([
|
||||
html.H1('This is our Config page'),
|
||||
html.Ul(children = [html.Li(f"{k} = {v}") for k,v in env.items()]),
|
||||
])
|
||||
27
dashboard/pages/home.py
Normal file
27
dashboard/pages/home.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from dash import dcc, html
|
||||
|
||||
from ..components.lists import html_list_schema
|
||||
from ..datalake import stages
|
||||
|
||||
layout = html.Div([
|
||||
html.Div(children=[
|
||||
html.Ul(
|
||||
children=[
|
||||
html.Li(
|
||||
children=[
|
||||
dcc.Link(
|
||||
stagename,
|
||||
href=f"/stage/{stagename}",
|
||||
className="text-2xl text-center p-2 bg-amber-100 rounded shadow"
|
||||
),
|
||||
html_list_schema(stage)
|
||||
],
|
||||
className="flex-1 bg-gray-100 rounded flex flex-col shadow"
|
||||
) for stagename, stage in stages.items()
|
||||
],
|
||||
className="flex flex-row space-x-2"
|
||||
)
|
||||
],
|
||||
className="w-full mt-4 px-2"
|
||||
),
|
||||
])
|
||||
18
dashboard/pages/repository.py
Normal file
18
dashboard/pages/repository.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from dash import html
|
||||
|
||||
from ..components.lists import html_list_schema
|
||||
from ..libs.repository.repository import AbstractRepository
|
||||
|
||||
|
||||
def layout_factory(repositories: dict[str, AbstractRepository]):
|
||||
def layout(repository_name: str = ""):
|
||||
repository = repositories[repository_name]
|
||||
return html.Div(
|
||||
[
|
||||
html.H2(f"{repository.name}", className="text-2xl p-4 py-2"),
|
||||
html_list_schema(repository),
|
||||
],
|
||||
className="flex flex-col",
|
||||
)
|
||||
|
||||
return layout
|
||||
28
dashboard/pages/schema.py
Normal file
28
dashboard/pages/schema.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from dash import dcc, html
|
||||
|
||||
from ..libs.repository.repository import AbstractRepository
|
||||
|
||||
|
||||
def layout_factory(repositories: dict[str, AbstractRepository]):
|
||||
def layout(repository_name: str = "", schema_name: str = ""):
|
||||
repository = repositories[repository_name]
|
||||
return html.Div(
|
||||
[
|
||||
html.H2(
|
||||
[
|
||||
dcc.Link(
|
||||
f"{repository.name}",
|
||||
href=f"/repository/{repository.name}",
|
||||
className="hover:underline",
|
||||
),
|
||||
html.Span(" > "),
|
||||
html.Span(
|
||||
f"{schema_name}",
|
||||
),
|
||||
],
|
||||
className="text-2xl p-4 py-2",
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
return layout
|
||||
130
dashboard/pages/table.py
Normal file
130
dashboard/pages/table.py
Normal file
@@ -0,0 +1,130 @@
|
||||
from dash import Input, Output, State, dash_table, dcc, html
|
||||
from dash.exceptions import PreventUpdate
|
||||
|
||||
from ..libs.repository.repository import AbstractRepository
|
||||
|
||||
|
||||
def layout_factory(repositories: dict[str,AbstractRepository]):
|
||||
def layout(repository_name:str="", schema_name:str="", table_name:str=""):
|
||||
repository = repositories[repository_name]
|
||||
df = repository.read(table=table_name, schema=schema_name)
|
||||
return html.Div([
|
||||
dcc.Store(id="table_backup"),
|
||||
html.Div([
|
||||
html.H2([
|
||||
dcc.Link(
|
||||
f"{repository.name}",
|
||||
href=f"/repository/{repository.name}",
|
||||
className="hover:underline"
|
||||
),
|
||||
html.Span(" > "),
|
||||
dcc.Link(
|
||||
f"{schema_name}",
|
||||
href=f"/stg/{repository.name}/schema/{schema_name}",
|
||||
className="hover:underline"
|
||||
),
|
||||
html.Span(" > "),
|
||||
html.Span(table_name),
|
||||
],
|
||||
className="text-2xl"
|
||||
),
|
||||
html.Div([
|
||||
html.Button(
|
||||
"Editer",
|
||||
id="btn_edit",
|
||||
className="rounded border px-2 py-1",
|
||||
style={"display": "block"}
|
||||
),
|
||||
html.Button(
|
||||
"Sauver",
|
||||
id="btn_save",
|
||||
className="rounded border px-2 py-1 border-green-500 hover:bg-green-500",
|
||||
style={"display": "none"}
|
||||
),
|
||||
html.Button(
|
||||
"Annuler",
|
||||
id="btn_cancel",
|
||||
className="rounded border px-2 py-1 border-red-500 hover:bg-red-500",
|
||||
style={"display": "none"}
|
||||
),
|
||||
],
|
||||
className="flex flex-row space-x-2",
|
||||
id="toolbar"
|
||||
),
|
||||
],
|
||||
className="flex flex-row justify-between p-4"
|
||||
),
|
||||
html.Div([
|
||||
html.Div([
|
||||
dash_table.DataTable(
|
||||
id="datatable",
|
||||
data=df.to_dict('records'),
|
||||
columns=[{"name": i, "id": i} for i in df.columns],
|
||||
filter_action="native",
|
||||
sort_action="native",
|
||||
sort_mode="multi",
|
||||
editable=False
|
||||
)
|
||||
])
|
||||
],
|
||||
className="overflow-y-auto"
|
||||
),
|
||||
],
|
||||
className="p-2"
|
||||
)
|
||||
return layout
|
||||
|
||||
|
||||
def callback_factory(app):
|
||||
@app.callback(
|
||||
Output("datatable", 'editable', allow_duplicate=True),
|
||||
Output("table_backup", 'data'),
|
||||
Input("btn_edit", "n_clicks"),
|
||||
State("datatable", 'data'),
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def activate_editable(n_clicks, df_src):
|
||||
if n_clicks is None:
|
||||
raise PreventUpdate
|
||||
if n_clicks > 0:
|
||||
df_backup = df_src.copy()
|
||||
return True, df_backup
|
||||
raise PreventUpdate
|
||||
|
||||
@app.callback(
|
||||
Output("datatable", 'editable', allow_duplicate=True),
|
||||
Output("datatable", 'data', allow_duplicate=True),
|
||||
Input("btn_cancel", "n_clicks"),
|
||||
State("table_backup", 'data'),
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def cancel_modifications(n_clicks, data):
|
||||
if n_clicks is None:
|
||||
raise PreventUpdate
|
||||
if n_clicks > 0 and data is not None:
|
||||
return False, data.copy()
|
||||
raise PreventUpdate
|
||||
|
||||
@app.callback(
|
||||
Output("datatable", 'editable'),
|
||||
Output("datatable", 'data'),
|
||||
Input("btn_save", "n_clicks"),
|
||||
State("datatable", 'editable'),
|
||||
)
|
||||
def save_modifications(n_clicks, editable):
|
||||
if n_clicks is None:
|
||||
raise PreventUpdate
|
||||
if n_clicks > 0:
|
||||
return not editable
|
||||
return editable
|
||||
|
||||
@app.callback(
|
||||
Output("btn_edit", "style"),
|
||||
Output("btn_save", "style"),
|
||||
Output("btn_cancel", "style"),
|
||||
Input("datatable", "editable"),
|
||||
)
|
||||
def toolbar(editable):
|
||||
if editable:
|
||||
return {"display": "none"}, {"display": "block"}, {"display": "block"}
|
||||
return {"display": "block"}, {"display": "none"}, {"display": "none"}
|
||||
@@ -1,33 +0,0 @@
|
||||
import dlt
|
||||
from pathlib import Path
|
||||
from pdf_oralia.extract import from_pdf
|
||||
import pdfplumber
|
||||
|
||||
DATA_PATH = Path("datas/")
|
||||
assert DATA_PATH.exists()
|
||||
RAW_CRG_PDF = DATA_PATH / "pdfs"
|
||||
assert RAW_CRG_PDF.exists()
|
||||
|
||||
|
||||
@dlt.resource(name="crg")
|
||||
def crg_pdf(filename):
|
||||
print(filename)
|
||||
pdf = pdfplumber.open(filename)
|
||||
try:
|
||||
df_charge, df_loc = from_pdf(pdf)
|
||||
except ValueError as e:
|
||||
print(f"\tExtract Error: {e}")
|
||||
pass
|
||||
else:
|
||||
for row in df_charge.to_dict("records"):
|
||||
yield row
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pipeline = dlt.pipeline(
|
||||
pipeline_name='raw', destination="duckdb", dataset_name="crg"
|
||||
)
|
||||
|
||||
for pdf_file in RAW_CRG_PDF.glob("**/*.pdf"):
|
||||
load_info = pipeline.run(crg_pdf(pdf_file), table_name='charge')
|
||||
print(load_info)
|
||||
Reference in New Issue
Block a user