Compare commits
8 Commits
e2805f9af2
...
dbt-dlt
Author | SHA1 | Date | |
---|---|---|---|
4249e902b2 | |||
ab36931c06 | |||
1ed6ed43ed | |||
215e26b84f | |||
b60fa3be17 | |||
a1578f813b | |||
d872cd7681 | |||
bfebd6b58a |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
datas/
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
@@ -158,3 +159,5 @@ cython_debug/
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
||||
*.duckdb
|
||||
|
19
Makefile
19
Makefile
@@ -42,8 +42,25 @@ clean_built:
|
||||
rm -rf $(DATA_BASE)/staging/**/*.csv
|
||||
rm -rf $(DATA_BASE)/gold/**/*.csv
|
||||
rm -rf $(DATA_BASE)/datamart/**/*.csv
|
||||
rm -rf $(DATA_BASE)/datamart/**/*.xlsx
|
||||
|
||||
run_ingest:
|
||||
python -m scripts ingest
|
||||
|
||||
run_feature:
|
||||
python -m scripts feature
|
||||
|
||||
run_datamart:
|
||||
python -m scripts datamart
|
||||
|
||||
build: clean_built run_ingest run_feature run_datamart
|
||||
|
||||
clean_all: clean_built clean_raw
|
||||
|
||||
import_nextcloud:
|
||||
rsync -a ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
|
||||
rsync -av ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
|
||||
|
||||
push_nextcloud:
|
||||
rsync -av ./datas/datamart/ ~/Nextcloud/PLESNA\ Compta\ SYSTEM/DataMart
|
||||
|
||||
|
||||
|
33
dlt/pdf_pipeline.py
Normal file
33
dlt/pdf_pipeline.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import dlt
|
||||
from pathlib import Path
|
||||
from pdf_oralia.extract import from_pdf
|
||||
import pdfplumber
|
||||
|
||||
DATA_PATH = Path("datas/")
|
||||
assert DATA_PATH.exists()
|
||||
RAW_CRG_PDF = DATA_PATH / "pdfs"
|
||||
assert RAW_CRG_PDF.exists()
|
||||
|
||||
|
||||
@dlt.resource(name="crg")
|
||||
def crg_pdf(filename):
|
||||
print(filename)
|
||||
pdf = pdfplumber.open(filename)
|
||||
try:
|
||||
df_charge, df_loc = from_pdf(pdf)
|
||||
except ValueError as e:
|
||||
print(f"\tExtract Error: {e}")
|
||||
pass
|
||||
else:
|
||||
for row in df_charge.to_dict("records"):
|
||||
yield row
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pipeline = dlt.pipeline(
|
||||
pipeline_name='raw', destination="duckdb", dataset_name="crg"
|
||||
)
|
||||
|
||||
for pdf_file in RAW_CRG_PDF.glob("**/*.pdf"):
|
||||
load_info = pipeline.run(crg_pdf(pdf_file), table_name='charge')
|
||||
print(load_info)
|
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -1,206 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "bc224455-95ed-4e33-864d-442396301cd4",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Staging vers Gold"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"id": "d5dff9f3-ec7d-4fc7-8471-5ed1fbf6cf06",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from pathlib import Path\n",
|
||||
"import pandas as pd"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"id": "4e5779f6-e0ad-46f8-b684-49af4205f084",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"staging_path = Path(\"../PLESNA Compta SYSTEM/staging\")\n",
|
||||
"assert staging_path.exists()\n",
|
||||
"gold_path = Path(\"../PLESNA Compta SYSTEM/gold\")\n",
|
||||
"assert gold_path.exists()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"id": "2074af18-4f81-49cb-9d9c-f50e7408e7fc",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def to_csv(df, dest):\n",
|
||||
" if dest.exists():\n",
|
||||
" df.to_csv(dest, mode=\"a\", header=False, index=False)\n",
|
||||
" else:\n",
|
||||
" df.to_csv(dest, index=False)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "cc74ba91-855a-41e7-8709-122425f98fb6",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### clean gold"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"id": "82de8bc5-8d1e-47fb-af28-076ed90835a9",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"for f in gold_path.glob(\"**/*.csv\"):\n",
|
||||
" f.unlink()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "539446e1-835e-4d79-a8d8-ddd5823f30f9",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## CRG"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"id": "a6423b7d-657f-4897-8dd3-fbca68318367",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2018.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2021.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2023.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2019.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2017.csv')]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"crg_path = staging_path / \"CRG\"\n",
|
||||
"assert crg_path.exists()\n",
|
||||
"crg_files = list(crg_path.glob(\"*.csv\"))\n",
|
||||
"print(crg_files)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 6,
|
||||
"id": "edcf15c4-aa3c-40c7-805d-ae8933decf8c",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2020.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2018.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2022.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2021.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2023.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2019.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2017.csv\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"for f in crg_files:\n",
|
||||
" df = pd.read_csv(f)\n",
|
||||
" df = df.assign(\n",
|
||||
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
|
||||
" )\n",
|
||||
" dest = gold_path / f\"CRG/{f.name}\"\n",
|
||||
" print(dest)\n",
|
||||
" to_csv(df, dest)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "811f6b89-be5a-4290-b3d5-466ec42eb3ae",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Banque"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"id": "c017b0a4-8c41-482e-85b1-4a10be84270b",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2021.csv')]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"banque_path = staging_path / \"Banque\"\n",
|
||||
"assert banque_path.exists()\n",
|
||||
"banque_files = list(banque_path.glob(\"*.csv\"))\n",
|
||||
"print(banque_files)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"id": "b04b0d11-dd74-4463-bd6f-c59528cc080e",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2020.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2022.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2021.csv\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"for f in banque_files:\n",
|
||||
" df = pd.read_csv(f)\n",
|
||||
" df = df.assign(\n",
|
||||
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
|
||||
" )\n",
|
||||
" dest = gold_path / f\"Banque/{f.name}\"\n",
|
||||
" print(dest)\n",
|
||||
" to_csv(df, dest)"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.11.6"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
@@ -3,3 +3,5 @@ pandas==1.5.0
|
||||
pdf-oralia==0.3.11
|
||||
pydantic==2.6.1
|
||||
click==8.1.7
|
||||
dlt[duckdb]>=0.4.3a0
|
||||
openpyxl>=3.0.0
|
||||
|
@@ -79,7 +79,11 @@ def feature():
|
||||
def datamart():
|
||||
from .gold_mart import FLUXES_LOT
|
||||
|
||||
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
|
||||
consume_fluxes(
|
||||
fluxes=FLUXES_LOT,
|
||||
origin_path=GOLD_PATH,
|
||||
dest_path=MART_PATH,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@@ -4,7 +4,7 @@ from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class Source(BaseModel):
|
||||
@@ -38,21 +38,51 @@ class Transformation(BaseModel):
|
||||
extra_kwrds: dict = {}
|
||||
|
||||
|
||||
def to_csv(df, dest_basename: Path) -> Path:
|
||||
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
||||
if dest.exists():
|
||||
df.to_csv(dest, mode="a", header=False, index=False)
|
||||
else:
|
||||
df.to_csv(dest, index=False)
|
||||
return dest
|
||||
|
||||
|
||||
def to_excel(df, dest_basename: Path) -> Path:
|
||||
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
|
||||
if dest.exists():
|
||||
raise ValueError(f"The destination exits {dest}")
|
||||
else:
|
||||
df.to_excel(dest)
|
||||
return dest
|
||||
|
||||
|
||||
class Destination(BaseModel):
|
||||
name: str
|
||||
writer: Callable = Field(to_csv)
|
||||
|
||||
def _write(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
dest_basename: Path,
|
||||
writing_func: Callable | None = None,
|
||||
) -> Path:
|
||||
if writing_func is None:
|
||||
writing_func = self.writer
|
||||
|
||||
return writing_func(df, dest_basename)
|
||||
|
||||
def write(
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
||||
) -> list[Path]:
|
||||
dest_basename = dest_path / self.name
|
||||
return [writing_func(df, dest_basename)]
|
||||
return [self._write(df, dest_basename, writing_func)]
|
||||
|
||||
|
||||
class SplitDestination(Destination):
|
||||
split_column: str
|
||||
|
||||
def write(
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
||||
) -> list[Path]:
|
||||
wrote_files = []
|
||||
|
||||
@@ -60,7 +90,7 @@ class SplitDestination(Destination):
|
||||
filtered_df = df[df[self.split_column] == col_value]
|
||||
|
||||
dest_basename = dest_path / f"{self.name}-{col_value}"
|
||||
dest = writing_func(filtered_df, dest_basename)
|
||||
dest = self._write(filtered_df, dest_basename, writing_func)
|
||||
wrote_files.append(dest)
|
||||
|
||||
return wrote_files
|
||||
@@ -72,15 +102,6 @@ class Flux(BaseModel):
|
||||
destination: Destination
|
||||
|
||||
|
||||
def to_csv(df, dest_basename: Path) -> Path:
|
||||
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
||||
if dest.exists():
|
||||
df.to_csv(dest, mode="a", header=False, index=False)
|
||||
else:
|
||||
df.to_csv(dest, index=False)
|
||||
return dest
|
||||
|
||||
|
||||
def write_split_by(
|
||||
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
|
||||
) -> list[Path]:
|
||||
@@ -119,26 +140,38 @@ def split_duplicates(
|
||||
return no_duplicates, duplicated
|
||||
|
||||
|
||||
def consume_flux(
|
||||
name: str,
|
||||
flux: Flux,
|
||||
origin_path: Path,
|
||||
dest_path: Path,
|
||||
duplicated={},
|
||||
):
|
||||
logging.info(f"Consume {name}")
|
||||
src_df = []
|
||||
for filename, df in extract_sources(flux.sources, origin_path):
|
||||
logging.info(f"Extracting {filename}")
|
||||
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||
src_df.append(df)
|
||||
|
||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||
|
||||
files = flux.destination.write(df, dest_path)
|
||||
|
||||
logging.info(f"{files} written")
|
||||
return files
|
||||
|
||||
|
||||
def consume_fluxes(
|
||||
fluxes: dict[str, Flux],
|
||||
origin_path: Path,
|
||||
dest_path: Path,
|
||||
writing_func=to_csv,
|
||||
):
|
||||
duplicated = {}
|
||||
wrote_files = []
|
||||
|
||||
for name, flux in fluxes.items():
|
||||
logging.info(f"Consume {name}")
|
||||
src_df = []
|
||||
for filename, df in extract_sources(flux.sources, origin_path):
|
||||
logging.info(f"Extracting {filename}")
|
||||
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||
src_df.append(df)
|
||||
|
||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||
files = flux.destination.write(df, dest_path, writing_func)
|
||||
logging.info(f"{files} written")
|
||||
files = consume_flux(name, flux, origin_path, dest_path, duplicated)
|
||||
wrote_files += files
|
||||
return wrote_files
|
||||
|
@@ -11,6 +11,7 @@ from scripts.flux import (
|
||||
SplitDestination,
|
||||
Transformation,
|
||||
consume_fluxes,
|
||||
to_excel,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -19,9 +20,6 @@ logger.setLevel(logging.DEBUG)
|
||||
|
||||
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
||||
df = pd.concat(dfs)
|
||||
df = df.assign(
|
||||
Impact=df["Crédit"] - df["Débit"],
|
||||
)
|
||||
return df
|
||||
|
||||
|
||||
@@ -29,7 +27,9 @@ FLUXES_LOT = {
|
||||
"Lots": Flux(
|
||||
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||
transformation=Transformation(function=build_lots),
|
||||
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
||||
destination=SplitDestination(
|
||||
name="Lot/lot", split_column="Lot", writer=to_excel
|
||||
),
|
||||
),
|
||||
}
|
||||
|
||||
@@ -78,6 +78,8 @@ if __name__ == "__main__":
|
||||
pnl_fluxes = {}
|
||||
|
||||
files = consume_fluxes(
|
||||
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
|
||||
fluxes=pnl_fluxes,
|
||||
origin_path=gold_path,
|
||||
dest_path=mart_path,
|
||||
)
|
||||
print(files)
|
||||
|
@@ -12,9 +12,9 @@ logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def extract_cat(cat: pd.DataFrame):
|
||||
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
||||
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"])
|
||||
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
||||
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
|
||||
|
||||
trans = {}
|
||||
for _, (old, new) in cat_trans.iterrows():
|
||||
trans[old] = new
|
||||
@@ -23,9 +23,11 @@ def extract_cat(cat: pd.DataFrame):
|
||||
|
||||
|
||||
def lot_naming(value):
|
||||
if str(value).isnumeric():
|
||||
return str(value).zfill(2)
|
||||
return "PC"
|
||||
try:
|
||||
v = int(value)
|
||||
except ValueError:
|
||||
return "PC"
|
||||
return str(v).zfill(2)
|
||||
|
||||
|
||||
def trans_2017_2021(
|
||||
@@ -99,7 +101,7 @@ def trans_2023(
|
||||
df = df.assign(
|
||||
Débit=df["Débit"].fillna(0),
|
||||
Crédit=df["Crédit"].fillna(0),
|
||||
Lot=lot_naming(df["Porte"]),
|
||||
Lot=df["Porte"].apply(lot_naming),
|
||||
Année=year,
|
||||
)
|
||||
return df[stagging_columns]
|
||||
@@ -138,7 +140,7 @@ FLUXES_CRG = {
|
||||
),
|
||||
"2022 - charge.xlsx": Flux(
|
||||
sources=[
|
||||
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
|
||||
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"),
|
||||
],
|
||||
transformation=Transformation(
|
||||
function=trans_2022_charge,
|
||||
|
Reference in New Issue
Block a user