Compare commits
21 Commits
e2805f9af2
...
dashboard
Author | SHA1 | Date | |
---|---|---|---|
e794242a03 | |||
5450de8628 | |||
08c7fbe4c5 | |||
959b53e6a0 | |||
91e229eab2 | |||
2de0e5ef5c | |||
7fb7bc6f5c | |||
612df0a8eb | |||
74882ae572 | |||
d8f2fb52e1 | |||
f9bfb917bd | |||
cdad13788a | |||
29c82ae597 | |||
5b53630688 | |||
ed6d1c87d1 | |||
1ed6ed43ed | |||
215e26b84f | |||
b60fa3be17 | |||
a1578f813b | |||
d872cd7681 | |||
bfebd6b58a |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
datas/
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
|
19
Makefile
19
Makefile
@@ -42,8 +42,25 @@ clean_built:
|
||||
rm -rf $(DATA_BASE)/staging/**/*.csv
|
||||
rm -rf $(DATA_BASE)/gold/**/*.csv
|
||||
rm -rf $(DATA_BASE)/datamart/**/*.csv
|
||||
rm -rf $(DATA_BASE)/datamart/**/*.xlsx
|
||||
|
||||
run_ingest:
|
||||
python -m scripts ingest
|
||||
|
||||
run_feature:
|
||||
python -m scripts feature
|
||||
|
||||
run_datamart:
|
||||
python -m scripts datamart
|
||||
|
||||
build: clean_built run_ingest run_feature run_datamart
|
||||
|
||||
clean_all: clean_built clean_raw
|
||||
|
||||
import_nextcloud:
|
||||
rsync -a ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
|
||||
rsync -av ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
|
||||
|
||||
push_nextcloud:
|
||||
rsync -av ./datas/datamart/ ~/Nextcloud/PLESNA\ Compta\ SYSTEM/DataMart
|
||||
|
||||
|
||||
|
64
dashboard/app.py
Normal file
64
dashboard/app.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import dash
|
||||
from dash import Dash, dcc, html
|
||||
|
||||
from .datalake import stages
|
||||
from .pages import config, home, repository, schema, table
|
||||
|
||||
external_scripts = [{"src": "https://cdn.tailwindcss.com"}]
|
||||
# external_script = ["https://tailwindcss.com/", {"src": "https://cdn.tailwindcss.com"}]
|
||||
|
||||
app = Dash(
|
||||
__name__,
|
||||
use_pages=True,
|
||||
external_scripts=external_scripts,
|
||||
suppress_callback_exceptions=True,
|
||||
)
|
||||
app.scripts.config.serve_locally = True
|
||||
dash.register_page(
|
||||
home.__name__,
|
||||
path="/",
|
||||
layout=home.layout,
|
||||
)
|
||||
dash.register_page(config.__name__, path="/config", layout=config.layout)
|
||||
dash.register_page(
|
||||
repository.__name__,
|
||||
path_template="/repository/<repository_name>",
|
||||
layout=repository.layout_factory(stages),
|
||||
)
|
||||
dash.register_page(
|
||||
schema.__name__,
|
||||
path_template="/stg/<repository_name>/schema/<schema_name>",
|
||||
layout=schema.layout_factory(stages),
|
||||
)
|
||||
dash.register_page(
|
||||
table.__name__,
|
||||
path_template="/stg/<repository_name>/schm/<schema_name>/table/<table_name>",
|
||||
layout=table.layout_factory(stages),
|
||||
)
|
||||
table.callback_factory(app)
|
||||
|
||||
app.layout = html.Div(
|
||||
[
|
||||
html.Div(
|
||||
[
|
||||
dcc.Link(
|
||||
html.H1(
|
||||
"Plesna",
|
||||
),
|
||||
href="/",
|
||||
className="text-4xl p-4 text-center grow align-baseline",
|
||||
),
|
||||
dcc.Link(
|
||||
"Config",
|
||||
href="/config",
|
||||
className="flex-none hover:bg-amber-100 p-4 align-middle",
|
||||
),
|
||||
],
|
||||
className="bg-amber-300 flex flex-row shadow",
|
||||
),
|
||||
dash.page_container,
|
||||
]
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(debug=True)
|
0
dashboard/components/__init__.py
Normal file
0
dashboard/components/__init__.py
Normal file
57
dashboard/components/lists.py
Normal file
57
dashboard/components/lists.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from dash import dcc, html
|
||||
|
||||
from ..libs.repository.repository import AbstractRepository
|
||||
|
||||
|
||||
def html_list_schema(stage:AbstractRepository, with_tables=True):
|
||||
""" Build html list of schema in stage """
|
||||
ul_classes = "ml-2"
|
||||
schema_baseurl = f"/stg/{stage.name}/schema/"
|
||||
if with_tables:
|
||||
return html.Ul(
|
||||
[
|
||||
html.Li(
|
||||
children = [
|
||||
dcc.Link(
|
||||
schema,
|
||||
href=schema_baseurl + schema,
|
||||
className="text-lg hover:underline"
|
||||
),
|
||||
html_list_table(stage, schema)
|
||||
],
|
||||
className=""
|
||||
) for schema in stage.schemas()
|
||||
],
|
||||
className=ul_classes
|
||||
)
|
||||
return html.Ul(
|
||||
[
|
||||
html.Li(
|
||||
dcc.Link(
|
||||
schema,
|
||||
href=schema_baseurl + schema,
|
||||
className="text-lg hover:underline"
|
||||
),
|
||||
) for schema in stage.schemas()
|
||||
],
|
||||
className=ul_classes
|
||||
)
|
||||
|
||||
|
||||
def html_list_table(stage:AbstractRepository, schema:str):
|
||||
""" Build html list of table in stage """
|
||||
table_baseurl = f"/stg/{stage.name}/schm/{schema}/table/"
|
||||
return html.Ul(
|
||||
[
|
||||
html.Li(
|
||||
dcc.Link(
|
||||
table,
|
||||
href=table_baseurl + table,
|
||||
className="hover:underline"
|
||||
),
|
||||
) for table in stage.tables(schema=schema)
|
||||
],
|
||||
className="ml-4"
|
||||
)
|
||||
|
||||
|
14
dashboard/datalake.py
Normal file
14
dashboard/datalake.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from dotenv import dotenv_values
|
||||
|
||||
from .libs.repository.fs_repository import FSRepository
|
||||
|
||||
env = {
|
||||
**dotenv_values(".env"),
|
||||
}
|
||||
|
||||
stages = {
|
||||
"raw": FSRepository("raw", f"{env['DATA_PATH']}/{env['RAW_SUBPATH']}"),
|
||||
"staging": FSRepository("staging", f"{env['DATA_PATH']}/{env['STAGING_SUBPATH']}"),
|
||||
"gold": FSRepository("gold", f"{env['DATA_PATH']}/{env['GOLD_SUBPATH']}"),
|
||||
"mart": FSRepository("mart", f"{env['DATA_PATH']}/{env['MART_SUBPATH']}"),
|
||||
}
|
0
dashboard/libs/__init__.py
Normal file
0
dashboard/libs/__init__.py
Normal file
0
dashboard/libs/flux/__init__.py
Normal file
0
dashboard/libs/flux/__init__.py
Normal file
70
dashboard/libs/flux/flux.py
Normal file
70
dashboard/libs/flux/flux.py
Normal file
@@ -0,0 +1,70 @@
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
|
||||
import pandas as pd
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ..repository.repository import AbstractRepository
|
||||
|
||||
|
||||
class Schema(BaseModel):
|
||||
repository: str
|
||||
schema: str
|
||||
|
||||
|
||||
class Table(BaseModel):
|
||||
repository: str
|
||||
schema: str
|
||||
table: str
|
||||
|
||||
|
||||
class Flux(BaseModel):
|
||||
sources: list[Table]
|
||||
destinations: dict[str, Table]
|
||||
transformation: Callable[[list[pd.DataFrame]], dict[str, pd.DataFrame]]
|
||||
|
||||
|
||||
class State(BaseModel):
|
||||
statuses: dict[str, dict]
|
||||
qty_out: int
|
||||
failed_lines: list[str]
|
||||
start: datetime
|
||||
end: datetime
|
||||
|
||||
|
||||
Repositories = dict[str, AbstractRepository]
|
||||
|
||||
|
||||
def open_source(repositories: Repositories, source: Table) -> pd.DataFrame:
|
||||
return repositories[source.repository].read(source.table, source.schema)
|
||||
|
||||
|
||||
def write_source(
|
||||
content: pd.DataFrame, repositories: Repositories, destination: Table
|
||||
) -> str:
|
||||
return repositories[destination.repository].write(
|
||||
content, destination.table, destination.schema
|
||||
)
|
||||
|
||||
|
||||
def consume_flux(flux: Flux, repositories: dict[str, AbstractRepository]) -> State:
|
||||
start = datetime.now()
|
||||
src_dfs = [open_source(repositories, source) for source in flux.sources]
|
||||
|
||||
built_dfs = flux.transformation(src_dfs)
|
||||
|
||||
statuses = {
|
||||
dest: write_source(df, repositories, flux.destinations[dest])
|
||||
for dest, df in built_dfs.items()
|
||||
}
|
||||
|
||||
end = datetime.now()
|
||||
qty_out = 0
|
||||
failed_lines = []
|
||||
return State(
|
||||
statuses=statuses,
|
||||
qty_out=qty_out,
|
||||
failed_lines=failed_lines,
|
||||
start=start,
|
||||
end=end,
|
||||
)
|
86
dashboard/libs/repository/fs_repository.py
Normal file
86
dashboard/libs/repository/fs_repository.py
Normal file
@@ -0,0 +1,86 @@
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from .repository import AbstractRepository
|
||||
|
||||
ACCEPTABLE_EXTENTIONS = {
|
||||
"csv": [".csv"],
|
||||
"excel": [".xls", ".xlsx"],
|
||||
}
|
||||
|
||||
class FSRepository(AbstractRepository):
|
||||
def __init__(self, name, basepath, metadata_engine=None):
|
||||
self.name = name
|
||||
|
||||
self.basepath = Path(basepath)
|
||||
assert self.basepath.exists()
|
||||
self._metadata_engine = metadata_engine
|
||||
|
||||
def ls(
|
||||
self, dir="", only_files=False, only_directories=False, recursive=False
|
||||
) -> list[str]:
|
||||
dirpath = self.basepath / dir
|
||||
|
||||
if only_files:
|
||||
return [
|
||||
str(f.relative_to(dirpath))
|
||||
for f in dirpath.iterdir()
|
||||
if not f.is_dir() and not str(f).startswith(".")
|
||||
]
|
||||
|
||||
if only_directories:
|
||||
if recursive:
|
||||
return [
|
||||
str(f[0].relative_to(dirpath))
|
||||
for f in dirpath.walk()
|
||||
if not str(f).startswith(".")
|
||||
]
|
||||
|
||||
return [
|
||||
str(f.relative_to(dirpath))
|
||||
for f in dirpath.iterdir()
|
||||
if f.is_dir() and not str(f).startswith(".")
|
||||
]
|
||||
|
||||
return [
|
||||
str(f.relative_to(dirpath))
|
||||
for f in dirpath.iterdir()
|
||||
if not str(f).startswith(".")
|
||||
]
|
||||
|
||||
def schemas(self, recursive=True) -> list[str]:
|
||||
return self.ls("", only_directories=True, recursive=True)
|
||||
|
||||
def tables(self, schema: str = ".") -> list[str]:
|
||||
return self.ls(schema, only_files=True)
|
||||
|
||||
def build_table_path(self, table: str, schema: str):
|
||||
table_path = self.basepath
|
||||
if schema == ".":
|
||||
return table_path / table
|
||||
return table_path / schema / table
|
||||
|
||||
def infos(self, table: str, schema: str = "."):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
||||
|
||||
def read(self, table: str, schema: str = ".", **read_options):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
assert table_path.exists()
|
||||
extension = table_path.suffix
|
||||
if extension in ACCEPTABLE_EXTENTIONS["csv"]:
|
||||
return pd.read_csv(table_path, **read_options)
|
||||
|
||||
if extension in ACCEPTABLE_EXTENTIONS["excel"]:
|
||||
return pd.read_excel(table_path, engine = "openpyxl", **read_options)
|
||||
|
||||
raise ValueError("Bad extention. Can't open the table.")
|
||||
|
||||
def write(self, content, table: str, schema: str = "."):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
||||
|
||||
def delete_table(self, table: str, schema: str = "."):
|
||||
table_path = self.build_table_path(table, schema)
|
||||
pass
|
5
dashboard/libs/repository/metadata.py
Normal file
5
dashboard/libs/repository/metadata.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from abc import ABC
|
||||
|
||||
|
||||
class AbstractMetadataEngine(ABC):
|
||||
pass
|
37
dashboard/libs/repository/repository.py
Normal file
37
dashboard/libs/repository/repository.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import abc
|
||||
|
||||
from .metadata import AbstractMetadataEngine
|
||||
|
||||
|
||||
class AbstractRepository(abc.ABC):
|
||||
metadata_engine = AbstractMetadataEngine
|
||||
|
||||
@abc.abstractmethod
|
||||
def schemas(self) -> list[str]:
|
||||
"""List schemas"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def tables(self, schema) -> list[str]:
|
||||
"""List table in schema"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def infos(self, table: str, schema: str) -> dict[str, str]:
|
||||
"""Get infos about the table"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def read(self, table: str, schema: str):
|
||||
"""Get content of the table"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def write(self, content, table: str, schema: str):
|
||||
"""Write content into the table"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_table(self, table: str, schema: str):
|
||||
"""Delete the table"""
|
||||
raise NotImplementedError
|
0
dashboard/pages/__init__.py
Normal file
0
dashboard/pages/__init__.py
Normal file
14
dashboard/pages/config.py
Normal file
14
dashboard/pages/config.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from dash import html
|
||||
from dotenv import dotenv_values
|
||||
import os
|
||||
|
||||
env = {
|
||||
**dotenv_values(".env"),
|
||||
**os.environ,
|
||||
}
|
||||
|
||||
|
||||
layout = html.Div([
|
||||
html.H1('This is our Config page'),
|
||||
html.Ul(children = [html.Li(f"{k} = {v}") for k,v in env.items()]),
|
||||
])
|
27
dashboard/pages/home.py
Normal file
27
dashboard/pages/home.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from dash import dcc, html
|
||||
|
||||
from ..components.lists import html_list_schema
|
||||
from ..datalake import stages
|
||||
|
||||
layout = html.Div([
|
||||
html.Div(children=[
|
||||
html.Ul(
|
||||
children=[
|
||||
html.Li(
|
||||
children=[
|
||||
dcc.Link(
|
||||
stagename,
|
||||
href=f"/stage/{stagename}",
|
||||
className="text-2xl text-center p-2 bg-amber-100 rounded shadow"
|
||||
),
|
||||
html_list_schema(stage)
|
||||
],
|
||||
className="flex-1 bg-gray-100 rounded flex flex-col shadow"
|
||||
) for stagename, stage in stages.items()
|
||||
],
|
||||
className="flex flex-row space-x-2"
|
||||
)
|
||||
],
|
||||
className="w-full mt-4 px-2"
|
||||
),
|
||||
])
|
18
dashboard/pages/repository.py
Normal file
18
dashboard/pages/repository.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from dash import html
|
||||
|
||||
from ..components.lists import html_list_schema
|
||||
from ..libs.repository.repository import AbstractRepository
|
||||
|
||||
|
||||
def layout_factory(repositories: dict[str, AbstractRepository]):
|
||||
def layout(repository_name: str = ""):
|
||||
repository = repositories[repository_name]
|
||||
return html.Div(
|
||||
[
|
||||
html.H2(f"{repository.name}", className="text-2xl p-4 py-2"),
|
||||
html_list_schema(repository),
|
||||
],
|
||||
className="flex flex-col",
|
||||
)
|
||||
|
||||
return layout
|
28
dashboard/pages/schema.py
Normal file
28
dashboard/pages/schema.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from dash import dcc, html
|
||||
|
||||
from ..libs.repository.repository import AbstractRepository
|
||||
|
||||
|
||||
def layout_factory(repositories: dict[str, AbstractRepository]):
|
||||
def layout(repository_name: str = "", schema_name: str = ""):
|
||||
repository = repositories[repository_name]
|
||||
return html.Div(
|
||||
[
|
||||
html.H2(
|
||||
[
|
||||
dcc.Link(
|
||||
f"{repository.name}",
|
||||
href=f"/repository/{repository.name}",
|
||||
className="hover:underline",
|
||||
),
|
||||
html.Span(" > "),
|
||||
html.Span(
|
||||
f"{schema_name}",
|
||||
),
|
||||
],
|
||||
className="text-2xl p-4 py-2",
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
return layout
|
130
dashboard/pages/table.py
Normal file
130
dashboard/pages/table.py
Normal file
@@ -0,0 +1,130 @@
|
||||
from dash import Input, Output, State, dash_table, dcc, html
|
||||
from dash.exceptions import PreventUpdate
|
||||
|
||||
from ..libs.repository.repository import AbstractRepository
|
||||
|
||||
|
||||
def layout_factory(repositories: dict[str,AbstractRepository]):
|
||||
def layout(repository_name:str="", schema_name:str="", table_name:str=""):
|
||||
repository = repositories[repository_name]
|
||||
df = repository.read(table=table_name, schema=schema_name)
|
||||
return html.Div([
|
||||
dcc.Store(id="table_backup"),
|
||||
html.Div([
|
||||
html.H2([
|
||||
dcc.Link(
|
||||
f"{repository.name}",
|
||||
href=f"/repository/{repository.name}",
|
||||
className="hover:underline"
|
||||
),
|
||||
html.Span(" > "),
|
||||
dcc.Link(
|
||||
f"{schema_name}",
|
||||
href=f"/stg/{repository.name}/schema/{schema_name}",
|
||||
className="hover:underline"
|
||||
),
|
||||
html.Span(" > "),
|
||||
html.Span(table_name),
|
||||
],
|
||||
className="text-2xl"
|
||||
),
|
||||
html.Div([
|
||||
html.Button(
|
||||
"Editer",
|
||||
id="btn_edit",
|
||||
className="rounded border px-2 py-1",
|
||||
style={"display": "block"}
|
||||
),
|
||||
html.Button(
|
||||
"Sauver",
|
||||
id="btn_save",
|
||||
className="rounded border px-2 py-1 border-green-500 hover:bg-green-500",
|
||||
style={"display": "none"}
|
||||
),
|
||||
html.Button(
|
||||
"Annuler",
|
||||
id="btn_cancel",
|
||||
className="rounded border px-2 py-1 border-red-500 hover:bg-red-500",
|
||||
style={"display": "none"}
|
||||
),
|
||||
],
|
||||
className="flex flex-row space-x-2",
|
||||
id="toolbar"
|
||||
),
|
||||
],
|
||||
className="flex flex-row justify-between p-4"
|
||||
),
|
||||
html.Div([
|
||||
html.Div([
|
||||
dash_table.DataTable(
|
||||
id="datatable",
|
||||
data=df.to_dict('records'),
|
||||
columns=[{"name": i, "id": i} for i in df.columns],
|
||||
filter_action="native",
|
||||
sort_action="native",
|
||||
sort_mode="multi",
|
||||
editable=False
|
||||
)
|
||||
])
|
||||
],
|
||||
className="overflow-y-auto"
|
||||
),
|
||||
],
|
||||
className="p-2"
|
||||
)
|
||||
return layout
|
||||
|
||||
|
||||
def callback_factory(app):
|
||||
@app.callback(
|
||||
Output("datatable", 'editable', allow_duplicate=True),
|
||||
Output("table_backup", 'data'),
|
||||
Input("btn_edit", "n_clicks"),
|
||||
State("datatable", 'data'),
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def activate_editable(n_clicks, df_src):
|
||||
if n_clicks is None:
|
||||
raise PreventUpdate
|
||||
if n_clicks > 0:
|
||||
df_backup = df_src.copy()
|
||||
return True, df_backup
|
||||
raise PreventUpdate
|
||||
|
||||
@app.callback(
|
||||
Output("datatable", 'editable', allow_duplicate=True),
|
||||
Output("datatable", 'data', allow_duplicate=True),
|
||||
Input("btn_cancel", "n_clicks"),
|
||||
State("table_backup", 'data'),
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def cancel_modifications(n_clicks, data):
|
||||
if n_clicks is None:
|
||||
raise PreventUpdate
|
||||
if n_clicks > 0 and data is not None:
|
||||
return False, data.copy()
|
||||
raise PreventUpdate
|
||||
|
||||
@app.callback(
|
||||
Output("datatable", 'editable'),
|
||||
Output("datatable", 'data'),
|
||||
Input("btn_save", "n_clicks"),
|
||||
State("datatable", 'editable'),
|
||||
)
|
||||
def save_modifications(n_clicks, editable):
|
||||
if n_clicks is None:
|
||||
raise PreventUpdate
|
||||
if n_clicks > 0:
|
||||
return not editable
|
||||
return editable
|
||||
|
||||
@app.callback(
|
||||
Output("btn_edit", "style"),
|
||||
Output("btn_save", "style"),
|
||||
Output("btn_cancel", "style"),
|
||||
Input("datatable", "editable"),
|
||||
)
|
||||
def toolbar(editable):
|
||||
if editable:
|
||||
return {"display": "none"}, {"display": "block"}, {"display": "block"}
|
||||
return {"display": "block"}, {"display": "none"}, {"display": "none"}
|
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -1,206 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "bc224455-95ed-4e33-864d-442396301cd4",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Staging vers Gold"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"id": "d5dff9f3-ec7d-4fc7-8471-5ed1fbf6cf06",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from pathlib import Path\n",
|
||||
"import pandas as pd"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"id": "4e5779f6-e0ad-46f8-b684-49af4205f084",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"staging_path = Path(\"../PLESNA Compta SYSTEM/staging\")\n",
|
||||
"assert staging_path.exists()\n",
|
||||
"gold_path = Path(\"../PLESNA Compta SYSTEM/gold\")\n",
|
||||
"assert gold_path.exists()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"id": "2074af18-4f81-49cb-9d9c-f50e7408e7fc",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def to_csv(df, dest):\n",
|
||||
" if dest.exists():\n",
|
||||
" df.to_csv(dest, mode=\"a\", header=False, index=False)\n",
|
||||
" else:\n",
|
||||
" df.to_csv(dest, index=False)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "cc74ba91-855a-41e7-8709-122425f98fb6",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### clean gold"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"id": "82de8bc5-8d1e-47fb-af28-076ed90835a9",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"for f in gold_path.glob(\"**/*.csv\"):\n",
|
||||
" f.unlink()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "539446e1-835e-4d79-a8d8-ddd5823f30f9",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## CRG"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"id": "a6423b7d-657f-4897-8dd3-fbca68318367",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2018.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2021.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2023.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2019.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2017.csv')]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"crg_path = staging_path / \"CRG\"\n",
|
||||
"assert crg_path.exists()\n",
|
||||
"crg_files = list(crg_path.glob(\"*.csv\"))\n",
|
||||
"print(crg_files)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 6,
|
||||
"id": "edcf15c4-aa3c-40c7-805d-ae8933decf8c",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2020.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2018.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2022.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2021.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2023.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2019.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2017.csv\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"for f in crg_files:\n",
|
||||
" df = pd.read_csv(f)\n",
|
||||
" df = df.assign(\n",
|
||||
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
|
||||
" )\n",
|
||||
" dest = gold_path / f\"CRG/{f.name}\"\n",
|
||||
" print(dest)\n",
|
||||
" to_csv(df, dest)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "811f6b89-be5a-4290-b3d5-466ec42eb3ae",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Banque"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"id": "c017b0a4-8c41-482e-85b1-4a10be84270b",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2021.csv')]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"banque_path = staging_path / \"Banque\"\n",
|
||||
"assert banque_path.exists()\n",
|
||||
"banque_files = list(banque_path.glob(\"*.csv\"))\n",
|
||||
"print(banque_files)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"id": "b04b0d11-dd74-4463-bd6f-c59528cc080e",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2020.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2022.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2021.csv\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"for f in banque_files:\n",
|
||||
" df = pd.read_csv(f)\n",
|
||||
" df = df.assign(\n",
|
||||
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
|
||||
" )\n",
|
||||
" dest = gold_path / f\"Banque/{f.name}\"\n",
|
||||
" print(dest)\n",
|
||||
" to_csv(df, dest)"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.11.6"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
@@ -1,5 +1,6 @@
|
||||
jupyter==1.0.0
|
||||
pandas==1.5.0
|
||||
pdf-oralia==0.3.11
|
||||
pydantic==2.6.1
|
||||
pandas==2.2.2
|
||||
pydantic==2.8.2
|
||||
click==8.1.7
|
||||
openpyxl==3.1.5
|
||||
xlrd==2.0.1
|
||||
|
@@ -79,7 +79,11 @@ def feature():
|
||||
def datamart():
|
||||
from .gold_mart import FLUXES_LOT
|
||||
|
||||
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
|
||||
consume_fluxes(
|
||||
fluxes=FLUXES_LOT,
|
||||
origin_path=GOLD_PATH,
|
||||
dest_path=MART_PATH,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@@ -4,7 +4,7 @@ from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class Source(BaseModel):
|
||||
@@ -38,21 +38,51 @@ class Transformation(BaseModel):
|
||||
extra_kwrds: dict = {}
|
||||
|
||||
|
||||
def to_csv(df, dest_basename: Path) -> Path:
|
||||
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
||||
if dest.exists():
|
||||
df.to_csv(dest, mode="a", header=False, index=False)
|
||||
else:
|
||||
df.to_csv(dest, index=False)
|
||||
return dest
|
||||
|
||||
|
||||
def to_excel(df, dest_basename: Path) -> Path:
|
||||
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
|
||||
if dest.exists():
|
||||
raise ValueError(f"The destination exits {dest}")
|
||||
else:
|
||||
df.to_excel(dest)
|
||||
return dest
|
||||
|
||||
|
||||
class Destination(BaseModel):
|
||||
name: str
|
||||
writer: Callable = Field(to_csv)
|
||||
|
||||
def _write(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
dest_basename: Path,
|
||||
writing_func: Callable | None = None,
|
||||
) -> Path:
|
||||
if writing_func is None:
|
||||
writing_func = self.writer
|
||||
|
||||
return writing_func(df, dest_basename)
|
||||
|
||||
def write(
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
||||
) -> list[Path]:
|
||||
dest_basename = dest_path / self.name
|
||||
return [writing_func(df, dest_basename)]
|
||||
return [self._write(df, dest_basename, writing_func)]
|
||||
|
||||
|
||||
class SplitDestination(Destination):
|
||||
split_column: str
|
||||
|
||||
def write(
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
||||
) -> list[Path]:
|
||||
wrote_files = []
|
||||
|
||||
@@ -60,7 +90,7 @@ class SplitDestination(Destination):
|
||||
filtered_df = df[df[self.split_column] == col_value]
|
||||
|
||||
dest_basename = dest_path / f"{self.name}-{col_value}"
|
||||
dest = writing_func(filtered_df, dest_basename)
|
||||
dest = self._write(filtered_df, dest_basename, writing_func)
|
||||
wrote_files.append(dest)
|
||||
|
||||
return wrote_files
|
||||
@@ -72,15 +102,6 @@ class Flux(BaseModel):
|
||||
destination: Destination
|
||||
|
||||
|
||||
def to_csv(df, dest_basename: Path) -> Path:
|
||||
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
||||
if dest.exists():
|
||||
df.to_csv(dest, mode="a", header=False, index=False)
|
||||
else:
|
||||
df.to_csv(dest, index=False)
|
||||
return dest
|
||||
|
||||
|
||||
def write_split_by(
|
||||
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
|
||||
) -> list[Path]:
|
||||
@@ -119,26 +140,38 @@ def split_duplicates(
|
||||
return no_duplicates, duplicated
|
||||
|
||||
|
||||
def consume_flux(
|
||||
name: str,
|
||||
flux: Flux,
|
||||
origin_path: Path,
|
||||
dest_path: Path,
|
||||
duplicated={},
|
||||
):
|
||||
logging.info(f"Consume {name}")
|
||||
src_df = []
|
||||
for filename, df in extract_sources(flux.sources, origin_path):
|
||||
logging.info(f"Extracting {filename}")
|
||||
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||
src_df.append(df)
|
||||
|
||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||
|
||||
files = flux.destination.write(df, dest_path)
|
||||
|
||||
logging.info(f"{files} written")
|
||||
return files
|
||||
|
||||
|
||||
def consume_fluxes(
|
||||
fluxes: dict[str, Flux],
|
||||
origin_path: Path,
|
||||
dest_path: Path,
|
||||
writing_func=to_csv,
|
||||
):
|
||||
duplicated = {}
|
||||
wrote_files = []
|
||||
|
||||
for name, flux in fluxes.items():
|
||||
logging.info(f"Consume {name}")
|
||||
src_df = []
|
||||
for filename, df in extract_sources(flux.sources, origin_path):
|
||||
logging.info(f"Extracting {filename}")
|
||||
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||
src_df.append(df)
|
||||
|
||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||
files = flux.destination.write(df, dest_path, writing_func)
|
||||
logging.info(f"{files} written")
|
||||
files = consume_flux(name, flux, origin_path, dest_path, duplicated)
|
||||
wrote_files += files
|
||||
return wrote_files
|
||||
|
@@ -11,6 +11,7 @@ from scripts.flux import (
|
||||
SplitDestination,
|
||||
Transformation,
|
||||
consume_fluxes,
|
||||
to_excel,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -19,9 +20,6 @@ logger.setLevel(logging.DEBUG)
|
||||
|
||||
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
||||
df = pd.concat(dfs)
|
||||
df = df.assign(
|
||||
Impact=df["Crédit"] - df["Débit"],
|
||||
)
|
||||
return df
|
||||
|
||||
|
||||
@@ -29,7 +27,9 @@ FLUXES_LOT = {
|
||||
"Lots": Flux(
|
||||
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||
transformation=Transformation(function=build_lots),
|
||||
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
||||
destination=SplitDestination(
|
||||
name="Lot/lot", split_column="Lot", writer=to_excel
|
||||
),
|
||||
),
|
||||
}
|
||||
|
||||
@@ -78,6 +78,8 @@ if __name__ == "__main__":
|
||||
pnl_fluxes = {}
|
||||
|
||||
files = consume_fluxes(
|
||||
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
|
||||
fluxes=pnl_fluxes,
|
||||
origin_path=gold_path,
|
||||
dest_path=mart_path,
|
||||
)
|
||||
print(files)
|
||||
|
@@ -12,9 +12,9 @@ logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def extract_cat(cat: pd.DataFrame):
|
||||
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
||||
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"])
|
||||
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
||||
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
|
||||
|
||||
trans = {}
|
||||
for _, (old, new) in cat_trans.iterrows():
|
||||
trans[old] = new
|
||||
@@ -23,9 +23,11 @@ def extract_cat(cat: pd.DataFrame):
|
||||
|
||||
|
||||
def lot_naming(value):
|
||||
if str(value).isnumeric():
|
||||
return str(value).zfill(2)
|
||||
return "PC"
|
||||
try:
|
||||
v = int(value)
|
||||
except ValueError:
|
||||
return "PC"
|
||||
return str(v).zfill(2)
|
||||
|
||||
|
||||
def trans_2017_2021(
|
||||
@@ -99,7 +101,7 @@ def trans_2023(
|
||||
df = df.assign(
|
||||
Débit=df["Débit"].fillna(0),
|
||||
Crédit=df["Crédit"].fillna(0),
|
||||
Lot=lot_naming(df["Porte"]),
|
||||
Lot=df["Porte"].apply(lot_naming),
|
||||
Année=year,
|
||||
)
|
||||
return df[stagging_columns]
|
||||
@@ -138,7 +140,7 @@ FLUXES_CRG = {
|
||||
),
|
||||
"2022 - charge.xlsx": Flux(
|
||||
sources=[
|
||||
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
|
||||
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"),
|
||||
],
|
||||
transformation=Transformation(
|
||||
function=trans_2022_charge,
|
||||
|
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
0
tests/repository/__init__.py
Normal file
0
tests/repository/__init__.py
Normal file
BIN
tests/repository/fs_examples/salary.pdf
Normal file
BIN
tests/repository/fs_examples/salary.pdf
Normal file
Binary file not shown.
BIN
tests/repository/fs_examples/username-password-recovery-code.xls
Normal file
BIN
tests/repository/fs_examples/username-password-recovery-code.xls
Normal file
Binary file not shown.
Binary file not shown.
7
tests/repository/fs_examples/username.csv
Normal file
7
tests/repository/fs_examples/username.csv
Normal file
@@ -0,0 +1,7 @@
|
||||
Username;Identifier;First name;Last name
|
||||
booker12;9012;Rachel;Booker
|
||||
grey07;2070;Laura;Grey
|
||||
johnson81;4081;Craig;Johnson
|
||||
jenkins46;9346;Mary;Jenkins
|
||||
smith79;5079;Jamie;Smith
|
||||
|
|
84
tests/repository/test_fs_repository.py
Normal file
84
tests/repository/test_fs_repository.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from pandas import pandas
|
||||
|
||||
from dashboard.libs.repository.fs_repository import FSRepository
|
||||
|
||||
EXAMPLE_DIR = "./tests/repository/fs_examples/"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def location(tmp_path):
|
||||
loc = tmp_path
|
||||
username_loc = loc / "username"
|
||||
username_loc.mkdir()
|
||||
salary_loc = loc / "salary"
|
||||
salary_loc.mkdir()
|
||||
example_src = Path(EXAMPLE_DIR)
|
||||
|
||||
for f in example_src.glob("*"):
|
||||
if "username" in str(f):
|
||||
shutil.copy(f, username_loc)
|
||||
else:
|
||||
shutil.copy(f, salary_loc)
|
||||
|
||||
return loc
|
||||
|
||||
|
||||
def test_init(location):
|
||||
repo = FSRepository("example", location)
|
||||
assert repo.ls() == [
|
||||
"username",
|
||||
"salary",
|
||||
]
|
||||
assert repo.schemas() == [
|
||||
".",
|
||||
"username",
|
||||
"salary",
|
||||
]
|
||||
|
||||
assert repo.tables() == []
|
||||
assert repo.tables("username") == [
|
||||
"username.csv",
|
||||
"username-password-recovery-code.xlsx",
|
||||
"username-password-recovery-code.xls",
|
||||
]
|
||||
assert repo.tables("salary") == ["salary.pdf"]
|
||||
|
||||
|
||||
def test_read_csv(location):
|
||||
repo = FSRepository("example", location)
|
||||
username = repo.read("username.csv", "username", delimiter=";")
|
||||
assert list(username.columns) == [
|
||||
"Username",
|
||||
"Identifier",
|
||||
"First name",
|
||||
"Last name",
|
||||
]
|
||||
assert len(username.index) == 5
|
||||
|
||||
|
||||
def test_fake_read_xlsx(location):
|
||||
repo = FSRepository("example", location)
|
||||
df = pandas.read_excel(
|
||||
location / "username" / "username-password-recovery-code.xls"
|
||||
)
|
||||
print(df)
|
||||
|
||||
|
||||
def test_read_xlsx(location):
|
||||
repo = FSRepository("example", location)
|
||||
username = repo.read("username-password-recovery-code.xls", "username")
|
||||
assert list(username.columns) == [
|
||||
"Username",
|
||||
"Identifier",
|
||||
"One-time password",
|
||||
"Recovery code",
|
||||
"First name",
|
||||
"Last name",
|
||||
"Department",
|
||||
"Location",
|
||||
]
|
||||
assert len(username.index) == 5
|
131
tests/test_flux.py
Normal file
131
tests/test_flux.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import pandas as pd
|
||||
import pytest
|
||||
|
||||
from dashboard.libs.flux.flux import Flux, consume_flux
|
||||
from dashboard.libs.repository.repository import AbstractRepository
|
||||
|
||||
FakeTable = pd.DataFrame
|
||||
FakeSchema = dict[str, pd.DataFrame]
|
||||
FakeSchemas = dict[str, FakeSchema]
|
||||
|
||||
|
||||
class FakeRepository(AbstractRepository):
|
||||
def __init__(self, schemas: FakeSchemas):
|
||||
self._schemas = {}
|
||||
for schema_name, tables in schemas.items():
|
||||
schema = {}
|
||||
for table, df in tables.items():
|
||||
schema[table] = {
|
||||
"df": df,
|
||||
"metadata": {
|
||||
"status": "new",
|
||||
"qty_read": 0,
|
||||
"qty_write": 0,
|
||||
},
|
||||
}
|
||||
self._schemas[schema_name] = schema
|
||||
|
||||
def schemas(self):
|
||||
"""List schemas"""
|
||||
return list(self._schemas.keys())
|
||||
|
||||
def tables(self, schema):
|
||||
"""List table's name in schema"""
|
||||
return list(self._schemas[schema].keys())
|
||||
|
||||
def infos(self, table: str, schema: str) -> dict[str, str]:
|
||||
"""Get infos about the table"""
|
||||
return self._schemas[schema][table]["metadata"]
|
||||
|
||||
def read(self, table, schema) -> pd.DataFrame:
|
||||
"""Get content of the table"""
|
||||
self._schemas[schema][table]["metadata"]["qty_read"] += 1
|
||||
return self._schemas[schema][table]["df"]
|
||||
|
||||
def write(self, content, table, schema) -> dict[str, str]:
|
||||
"""Write content into the table"""
|
||||
try:
|
||||
self._schemas[schema][table]["df"] = content
|
||||
except KeyError:
|
||||
self._schemas[schema][table] = {
|
||||
"df": content,
|
||||
"metadata": {
|
||||
"status": "new",
|
||||
"qty_read": 0,
|
||||
"qty_write": 0,
|
||||
},
|
||||
}
|
||||
self._schemas[schema][table]["metadata"]["status"] = "modified"
|
||||
self._schemas[schema][table]["metadata"]["qty_write"] += 1
|
||||
return self.infos(table, schema)
|
||||
|
||||
def delete_table(self, table, schema):
|
||||
"""Delete the table"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def test_fakerepository():
|
||||
fakerepository = FakeRepository(
|
||||
{
|
||||
"foo": {
|
||||
"table1": pd.DataFrame({"A": []}),
|
||||
"table2": pd.DataFrame({"B": []}),
|
||||
},
|
||||
"bar": {
|
||||
"table1": pd.DataFrame({"C": []}),
|
||||
"table2": pd.DataFrame({"D": []}),
|
||||
},
|
||||
}
|
||||
)
|
||||
assert fakerepository.schemas() == ["foo", "bar"]
|
||||
assert fakerepository.tables("foo") == ["table1", "table2"]
|
||||
assert fakerepository.infos("table1", "foo") == {
|
||||
"status": "new",
|
||||
"qty_read": 0,
|
||||
"qty_write": 0,
|
||||
}
|
||||
assert fakerepository.read("table1", "foo").equals(pd.DataFrame({"A": []}))
|
||||
assert fakerepository.infos("table1", "foo") == {
|
||||
"status": "new",
|
||||
"qty_read": 1,
|
||||
"qty_write": 0,
|
||||
}
|
||||
|
||||
df = pd.DataFrame({"A": [1, 2]})
|
||||
assert fakerepository.write(df, "table1", "foo") == {
|
||||
"status": "modified",
|
||||
"qty_read": 1,
|
||||
"qty_write": 1,
|
||||
}
|
||||
|
||||
|
||||
def test_consume_flux():
|
||||
source_repository = FakeRepository(
|
||||
{
|
||||
"source": {
|
||||
"table1": pd.DataFrame({"A": [1, 2, 3]}),
|
||||
},
|
||||
}
|
||||
)
|
||||
dest_repository = FakeRepository(
|
||||
{
|
||||
"destination": {},
|
||||
}
|
||||
)
|
||||
repositories = {
|
||||
"source": source_repository,
|
||||
"dest": dest_repository,
|
||||
}
|
||||
transformation = lambda dfs: {"dest": dfs[0] * 2}
|
||||
|
||||
flux = Flux(
|
||||
sources=[{"repository": "source", "schema": "source", "table": "table1"}],
|
||||
destinations={
|
||||
"dest": {"repository": "dest", "schema": "destination", "table": "table1"}
|
||||
},
|
||||
transformation=transformation,
|
||||
)
|
||||
|
||||
state = consume_flux(flux, repositories)
|
||||
assert state.statuses["dest"] == {'status': 'modified', 'qty_read': 0, 'qty_write': 1}
|
||||
assert dest_repository.read("table1", "destination").equals(pd.DataFrame({"A": [2, 4, 6]}))
|
Reference in New Issue
Block a user