Compare commits

14 Commits

13 changed files with 280 additions and 6365 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,4 @@
datas/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@@ -158,3 +159,5 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
*.duckdb

View File

@@ -42,8 +42,25 @@ clean_built:
rm -rf $(DATA_BASE)/staging/**/*.csv
rm -rf $(DATA_BASE)/gold/**/*.csv
rm -rf $(DATA_BASE)/datamart/**/*.csv
rm -rf $(DATA_BASE)/datamart/**/*.xlsx
run_ingest:
python -m scripts ingest
run_feature:
python -m scripts feature
run_datamart:
python -m scripts datamart
build: clean_built run_ingest run_feature run_datamart
clean_all: clean_built clean_raw
import_nextcloud:
rsync -a ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
rsync -av ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
push_nextcloud:
rsync -av ./datas/datamart/ ~/Nextcloud/PLESNA\ Compta\ SYSTEM/DataMart

33
dlt/pdf_pipeline.py Normal file
View File

@@ -0,0 +1,33 @@
import dlt
from pathlib import Path
from pdf_oralia.extract import from_pdf
import pdfplumber
DATA_PATH = Path("datas/")
assert DATA_PATH.exists()
RAW_CRG_PDF = DATA_PATH / "pdfs"
assert RAW_CRG_PDF.exists()
@dlt.resource(name="crg")
def crg_pdf(filename):
print(filename)
pdf = pdfplumber.open(filename)
try:
df_charge, df_loc = from_pdf(pdf)
except ValueError as e:
print(f"\tExtract Error: {e}")
pass
else:
for row in df_charge.to_dict("records"):
yield row
if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name='raw', destination="duckdb", dataset_name="crg"
)
for pdf_file in RAW_CRG_PDF.glob("**/*.pdf"):
load_info = pipeline.run(crg_pdf(pdf_file), table_name='charge')
print(load_info)

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,206 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "bc224455-95ed-4e33-864d-442396301cd4",
"metadata": {},
"source": [
"# Staging vers Gold"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "d5dff9f3-ec7d-4fc7-8471-5ed1fbf6cf06",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "4e5779f6-e0ad-46f8-b684-49af4205f084",
"metadata": {},
"outputs": [],
"source": [
"staging_path = Path(\"../PLESNA Compta SYSTEM/staging\")\n",
"assert staging_path.exists()\n",
"gold_path = Path(\"../PLESNA Compta SYSTEM/gold\")\n",
"assert gold_path.exists()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "2074af18-4f81-49cb-9d9c-f50e7408e7fc",
"metadata": {},
"outputs": [],
"source": [
"def to_csv(df, dest):\n",
" if dest.exists():\n",
" df.to_csv(dest, mode=\"a\", header=False, index=False)\n",
" else:\n",
" df.to_csv(dest, index=False)"
]
},
{
"cell_type": "markdown",
"id": "cc74ba91-855a-41e7-8709-122425f98fb6",
"metadata": {},
"source": [
"### clean gold"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "82de8bc5-8d1e-47fb-af28-076ed90835a9",
"metadata": {},
"outputs": [],
"source": [
"for f in gold_path.glob(\"**/*.csv\"):\n",
" f.unlink()"
]
},
{
"cell_type": "markdown",
"id": "539446e1-835e-4d79-a8d8-ddd5823f30f9",
"metadata": {},
"source": [
"## CRG"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "a6423b7d-657f-4897-8dd3-fbca68318367",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2018.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2021.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2023.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2019.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2017.csv')]\n"
]
}
],
"source": [
"crg_path = staging_path / \"CRG\"\n",
"assert crg_path.exists()\n",
"crg_files = list(crg_path.glob(\"*.csv\"))\n",
"print(crg_files)\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "edcf15c4-aa3c-40c7-805d-ae8933decf8c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"../PLESNA Compta SYSTEM/gold/CRG/2020.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2018.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2022.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2021.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2023.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2019.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2017.csv\n"
]
}
],
"source": [
"for f in crg_files:\n",
" df = pd.read_csv(f)\n",
" df = df.assign(\n",
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
" )\n",
" dest = gold_path / f\"CRG/{f.name}\"\n",
" print(dest)\n",
" to_csv(df, dest)"
]
},
{
"cell_type": "markdown",
"id": "811f6b89-be5a-4290-b3d5-466ec42eb3ae",
"metadata": {},
"source": [
"## Banque"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "c017b0a4-8c41-482e-85b1-4a10be84270b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2021.csv')]\n"
]
}
],
"source": [
"banque_path = staging_path / \"Banque\"\n",
"assert banque_path.exists()\n",
"banque_files = list(banque_path.glob(\"*.csv\"))\n",
"print(banque_files)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "b04b0d11-dd74-4463-bd6f-c59528cc080e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"../PLESNA Compta SYSTEM/gold/Banque/2020.csv\n",
"../PLESNA Compta SYSTEM/gold/Banque/2022.csv\n",
"../PLESNA Compta SYSTEM/gold/Banque/2021.csv\n"
]
}
],
"source": [
"for f in banque_files:\n",
" df = pd.read_csv(f)\n",
" df = df.assign(\n",
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
" )\n",
" dest = gold_path / f\"Banque/{f.name}\"\n",
" print(dest)\n",
" to_csv(df, dest)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -2,3 +2,6 @@ jupyter==1.0.0
pandas==1.5.0
pdf-oralia==0.3.11
pydantic==2.6.1
click==8.1.7
dlt[duckdb]>=0.4.3a0
openpyxl>=3.0.0

90
scripts/__main__.py Normal file
View File

@@ -0,0 +1,90 @@
import logging
from logging.config import dictConfig
from pathlib import Path
import click
from .flux import consume_fluxes
DATA_PATH = Path("datas/")
assert DATA_PATH.exists()
HISTORY_PATH = DATA_PATH / "Histoire"
assert HISTORY_PATH.exists()
STAGING_PATH = DATA_PATH / "staging"
assert STAGING_PATH.exists()
GOLD_PATH = DATA_PATH / "gold"
assert GOLD_PATH.exists()
MART_PATH = DATA_PATH / "datamart"
assert MART_PATH.exists()
@click.group()
@click.option("--debug/--no-debug", default=False)
def main(debug):
if debug:
logging_level = logging.DEBUG
else:
logging_level = logging.INFO
logging_config = dict(
version=1,
formatters={"f": {"format": "%(levelname)-8s %(name)-12s %(message)s"}},
handlers={
"h": {
"class": "logging.StreamHandler",
"formatter": "f",
"level": logging_level,
}
},
root={
"handlers": ["h"],
"level": logging_level,
},
)
dictConfig(logging_config)
@main.command()
def ingest():
from .history_stagging import FLUXES_CRG
history_crg_path = HISTORY_PATH / "CRG"
assert history_crg_path.exists()
staging_crg_path = STAGING_PATH / "CRG"
assert staging_crg_path.exists()
consume_fluxes(
fluxes=FLUXES_CRG,
origin_path=history_crg_path,
dest_path=staging_crg_path,
)
@main.command()
def feature():
from .stagging_gold import FLUXES_CRG
staging_crg_path = STAGING_PATH / "CRG"
assert staging_crg_path.exists()
gold_crg_path = GOLD_PATH / "CRG"
assert gold_crg_path.exists()
consume_fluxes(
fluxes=FLUXES_CRG(staging_crg_path),
origin_path=staging_crg_path,
dest_path=gold_crg_path,
)
@main.command()
def datamart():
from .gold_mart import FLUXES_LOT
consume_fluxes(
fluxes=FLUXES_LOT,
origin_path=GOLD_PATH,
dest_path=MART_PATH,
)
if __name__ == "__main__":
main()

View File

@@ -4,10 +4,7 @@ from collections.abc import Callable
from pathlib import Path
import pandas as pd
from pydantic import BaseModel
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
from pydantic import BaseModel, Field
class Source(BaseModel):
@@ -23,7 +20,7 @@ class ExcelSource(Source):
def get_df(self, base_path: Path) -> pd.DataFrame:
filepath = base_path / self.filename
logger.debug(f"Get content of {filepath}")
logging.debug(f"Get content of {filepath}")
return pd.read_excel(filepath, sheet_name=self.sheet_name)
@@ -32,7 +29,7 @@ class CSVSource(Source):
def get_df(self, base_path: Path) -> pd.DataFrame:
filepath = base_path / self.filename
logger.debug(f"Get content of {filepath}")
logging.debug(f"Get content of {filepath}")
return pd.read_csv(filepath, **self.options)
@@ -41,40 +38,6 @@ class Transformation(BaseModel):
extra_kwrds: dict = {}
class Destination(BaseModel):
name: str
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
) -> list[Path]:
dest_basename = dest_path / self.name
return [writing_func(df, dest_basename)]
class SplitDestination(Destination):
split_column: str
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
) -> list[Path]:
wrote_files = []
for col_value in df[self.split_column].unique():
filtered_df = df[df[self.split_column] == col_value]
dest_basename = dest_path / f"{self.name}-{col_value}"
dest = writing_func(filtered_df, dest_basename)
wrote_files.append(dest)
return wrote_files
class Flux(BaseModel):
sources: list[Source]
transformation: Transformation
destination: Destination
def to_csv(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".csv")
if dest.exists():
@@ -84,6 +47,61 @@ def to_csv(df, dest_basename: Path) -> Path:
return dest
def to_excel(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
if dest.exists():
raise ValueError(f"The destination exits {dest}")
else:
df.to_excel(dest)
return dest
class Destination(BaseModel):
name: str
writer: Callable = Field(to_csv)
def _write(
self,
df: pd.DataFrame,
dest_basename: Path,
writing_func: Callable | None = None,
) -> Path:
if writing_func is None:
writing_func = self.writer
return writing_func(df, dest_basename)
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
) -> list[Path]:
dest_basename = dest_path / self.name
return [self._write(df, dest_basename, writing_func)]
class SplitDestination(Destination):
split_column: str
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
) -> list[Path]:
wrote_files = []
for col_value in df[self.split_column].unique():
filtered_df = df[df[self.split_column] == col_value]
dest_basename = dest_path / f"{self.name}-{col_value}"
dest = self._write(filtered_df, dest_basename, writing_func)
wrote_files.append(dest)
return wrote_files
class Flux(BaseModel):
sources: list[Source]
transformation: Transformation
destination: Destination
def write_split_by(
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
) -> list[Path]:
@@ -122,25 +140,38 @@ def split_duplicates(
return no_duplicates, duplicated
def consume_flux(
name: str,
flux: Flux,
origin_path: Path,
dest_path: Path,
duplicated={},
):
logging.info(f"Consume {name}")
src_df = []
for filename, df in extract_sources(flux.sources, origin_path):
logging.info(f"Extracting {filename}")
df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df)
logging.info(f"Execute {flux.transformation.function.__name__}")
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
files = flux.destination.write(df, dest_path)
logging.info(f"{files} written")
return files
def consume_fluxes(
fluxes: dict[str, Flux],
origin_path: Path,
dest_path: Path,
writing_func=to_csv,
):
duplicated = {}
wrote_files = []
for name, flux in fluxes.items():
print(name)
logger.info(f"Processing flux {name}")
src_df = []
for filename, df in extract_sources(flux.sources, origin_path):
df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df)
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
files = flux.destination.write(df, dest_path, writing_func)
files = consume_flux(name, flux, origin_path, dest_path, duplicated)
wrote_files += files
return wrote_files

View File

@@ -11,6 +11,7 @@ from scripts.flux import (
SplitDestination,
Transformation,
consume_fluxes,
to_excel,
)
logger = logging.getLogger(__name__)
@@ -18,13 +19,21 @@ logger.setLevel(logging.DEBUG)
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
df = dfs[0]
df = df.assign(
Impact=df["Crédit"] - df["Débit"],
)
df = pd.concat(dfs)
return df
FLUXES_LOT = {
"Lots": Flux(
sources=[CSVSource(filename="CRG/crg-*.csv")],
transformation=Transformation(function=build_lots),
destination=SplitDestination(
name="Lot/lot", split_column="Lot", writer=to_excel
),
),
}
def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame:
df = pd.concat(dfs)
df = df[df["Année"] == year]
@@ -40,8 +49,10 @@ def build_pnl_flux(year: int) -> Flux:
CSVSource(filename=f"CRG/crg-{year}.csv"),
CSVSource(filename=f"banque/banque-{year}.csv"),
],
transformation=build_pnl,
extra_kwrds={"year": year},
transformation=Transformation(
function=build_pnl,
extra_kwrds={"year": year},
),
destination=Destination(name=f"pnl/{year}"),
)
@@ -58,18 +69,17 @@ if __name__ == "__main__":
mart_path = data_path / "datamart"
assert mart_path.exists()
lot_fluxes = {
"Lots": Flux(
sources=[CSVSource(filename="CRG/crg-*.csv")],
transformation=Transformation(function=build_lots),
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
),
}
files = consume_fluxes(
fluxes=FLUXES_LOT, origin_path=gold_path, dest_path=mart_path
)
years = list(range(2017, 2024))
# pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years}
pnl_fluxes = {}
files = consume_fluxes(
fluxes={**lot_fluxes, **pnl_fluxes}, origin_path=gold_path, dest_path=mart_path
fluxes=pnl_fluxes,
origin_path=gold_path,
dest_path=mart_path,
)
print(files)

View File

@@ -12,9 +12,9 @@ logger.setLevel(logging.DEBUG)
def extract_cat(cat: pd.DataFrame):
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"])
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
trans = {}
for _, (old, new) in cat_trans.iterrows():
trans[old] = new
@@ -22,6 +22,14 @@ def extract_cat(cat: pd.DataFrame):
return trans, cat_drop
def lot_naming(value):
try:
v = int(value)
except ValueError:
return "PC"
return str(v).zfill(2)
def trans_2017_2021(
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
) -> pd.DataFrame:
@@ -35,7 +43,7 @@ def trans_2017_2021(
Porte=df["porte"],
Débit=df["Débit"].fillna(0),
Crédit=df["Crédit"].fillna(0),
Lot=df["immeuble"].astype(str) + df["porte"].astype("str").str.zfill(2),
Lot=df["porte"].apply(lot_naming),
Année=df["Date"].astype(str).str.slice(0, 4),
Mois=df["Date"].astype(str).str.slice(5, 7),
Catégorie=df["Categorie"].replace(cat_trans),
@@ -54,7 +62,7 @@ def trans_2022_charge(
Porte=df["lot"],
Débit=df["Débits"].fillna(0),
Crédit=df["Crédits"].fillna(0),
Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
Lot=df["lot"].apply(lot_naming),
Année=df["annee"],
Mois=df["mois"],
Catégorie=df["Catégorie Charge"],
@@ -75,7 +83,7 @@ def trans_2022_loc(
Porte=df["lot"],
Débit=0,
Crédit=df["Réglés"].fillna(0),
Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
Lot=df["lot"].apply(lot_naming),
Année=df["annee"],
Mois=df["mois"],
Catégorie="Loyer Charge",
@@ -93,7 +101,7 @@ def trans_2023(
df = df.assign(
Débit=df["Débit"].fillna(0),
Crédit=df["Crédit"].fillna(0),
Lot=df["Immeuble"].astype(str) + df["Porte"].astype("str").str.zfill(2),
Lot=df["Porte"].apply(lot_naming),
Année=year,
)
return df[stagging_columns]
@@ -113,7 +121,7 @@ STAGGING_COLUMNS = [
"Crédit",
]
FLUXES = {
FLUXES_CRG = {
"2017 2021 - charge et locataire.xlsx": Flux(
sources=[
ExcelSource(
@@ -132,7 +140,7 @@ FLUXES = {
),
"2022 - charge.xlsx": Flux(
sources=[
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"),
],
transformation=Transformation(
function=trans_2022_charge,
@@ -179,7 +187,7 @@ if __name__ == "__main__":
assert staging_crg_path.exists()
crg_files = consume_fluxes(
fluxes=FLUXES,
fluxes=FLUXES_CRG,
origin_path=history_crg_path,
dest_path=staging_crg_path,
)

View File

@@ -14,6 +14,7 @@ def feature_crg(dfs: list[pd.DataFrame]) -> pd.DataFrame:
df = dfs[0]
df = df.assign(
Impact=df["Crédit"] - df["Débit"],
Lot=df["Immeuble"].astype(str) + df["Lot"].astype("str"),
)
return df
@@ -47,6 +48,12 @@ def build_crg_fluxes(
return fluxes
def FLUXES_CRG(staging_crg_path: Path):
return build_crg_fluxes(
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
)
if __name__ == "__main__":
data_path = Path("datas/")
assert data_path.exists()
@@ -61,10 +68,9 @@ if __name__ == "__main__":
gold_crg_path = gold_path / "CRG"
assert gold_crg_path.exists()
fluxes = build_crg_fluxes(
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
)
crg_files = consume_fluxes(
fluxes=fluxes, origin_path=staging_crg_path, dest_path=gold_crg_path
fluxes=FLUXES_CRG(staging_crg_path),
origin_path=staging_crg_path,
dest_path=gold_crg_path,
)
print(crg_files)