Compare commits

6 Commits

11 changed files with 101 additions and 6326 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
datas/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

View File

@@ -42,8 +42,25 @@ clean_built:
rm -rf $(DATA_BASE)/staging/**/*.csv
rm -rf $(DATA_BASE)/gold/**/*.csv
rm -rf $(DATA_BASE)/datamart/**/*.csv
rm -rf $(DATA_BASE)/datamart/**/*.xlsx
run_ingest:
python -m scripts ingest
run_feature:
python -m scripts feature
run_datamart:
python -m scripts datamart
build: clean_built run_ingest run_feature run_datamart
clean_all: clean_built clean_raw
import_nextcloud:
rsync -a ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
rsync -av ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
push_nextcloud:
rsync -av ./datas/datamart/ ~/Nextcloud/PLESNA\ Compta\ SYSTEM/DataMart

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,206 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "bc224455-95ed-4e33-864d-442396301cd4",
"metadata": {},
"source": [
"# Staging vers Gold"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "d5dff9f3-ec7d-4fc7-8471-5ed1fbf6cf06",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "4e5779f6-e0ad-46f8-b684-49af4205f084",
"metadata": {},
"outputs": [],
"source": [
"staging_path = Path(\"../PLESNA Compta SYSTEM/staging\")\n",
"assert staging_path.exists()\n",
"gold_path = Path(\"../PLESNA Compta SYSTEM/gold\")\n",
"assert gold_path.exists()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "2074af18-4f81-49cb-9d9c-f50e7408e7fc",
"metadata": {},
"outputs": [],
"source": [
"def to_csv(df, dest):\n",
" if dest.exists():\n",
" df.to_csv(dest, mode=\"a\", header=False, index=False)\n",
" else:\n",
" df.to_csv(dest, index=False)"
]
},
{
"cell_type": "markdown",
"id": "cc74ba91-855a-41e7-8709-122425f98fb6",
"metadata": {},
"source": [
"### clean gold"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "82de8bc5-8d1e-47fb-af28-076ed90835a9",
"metadata": {},
"outputs": [],
"source": [
"for f in gold_path.glob(\"**/*.csv\"):\n",
" f.unlink()"
]
},
{
"cell_type": "markdown",
"id": "539446e1-835e-4d79-a8d8-ddd5823f30f9",
"metadata": {},
"source": [
"## CRG"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "a6423b7d-657f-4897-8dd3-fbca68318367",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2018.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2021.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2023.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2019.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2017.csv')]\n"
]
}
],
"source": [
"crg_path = staging_path / \"CRG\"\n",
"assert crg_path.exists()\n",
"crg_files = list(crg_path.glob(\"*.csv\"))\n",
"print(crg_files)\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "edcf15c4-aa3c-40c7-805d-ae8933decf8c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"../PLESNA Compta SYSTEM/gold/CRG/2020.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2018.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2022.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2021.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2023.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2019.csv\n",
"../PLESNA Compta SYSTEM/gold/CRG/2017.csv\n"
]
}
],
"source": [
"for f in crg_files:\n",
" df = pd.read_csv(f)\n",
" df = df.assign(\n",
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
" )\n",
" dest = gold_path / f\"CRG/{f.name}\"\n",
" print(dest)\n",
" to_csv(df, dest)"
]
},
{
"cell_type": "markdown",
"id": "811f6b89-be5a-4290-b3d5-466ec42eb3ae",
"metadata": {},
"source": [
"## Banque"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "c017b0a4-8c41-482e-85b1-4a10be84270b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2021.csv')]\n"
]
}
],
"source": [
"banque_path = staging_path / \"Banque\"\n",
"assert banque_path.exists()\n",
"banque_files = list(banque_path.glob(\"*.csv\"))\n",
"print(banque_files)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "b04b0d11-dd74-4463-bd6f-c59528cc080e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"../PLESNA Compta SYSTEM/gold/Banque/2020.csv\n",
"../PLESNA Compta SYSTEM/gold/Banque/2022.csv\n",
"../PLESNA Compta SYSTEM/gold/Banque/2021.csv\n"
]
}
],
"source": [
"for f in banque_files:\n",
" df = pd.read_csv(f)\n",
" df = df.assign(\n",
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
" )\n",
" dest = gold_path / f\"Banque/{f.name}\"\n",
" print(dest)\n",
" to_csv(df, dest)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -3,3 +3,5 @@ pandas==1.5.0
pdf-oralia==0.3.11
pydantic==2.6.1
click==8.1.7
dlt[duckdb]>=0.4.3a0
openpyxl>=3.0.0

View File

@@ -79,7 +79,11 @@ def feature():
def datamart():
from .gold_mart import FLUXES_LOT
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
consume_fluxes(
fluxes=FLUXES_LOT,
origin_path=GOLD_PATH,
dest_path=MART_PATH,
)
if __name__ == "__main__":

View File

@@ -4,7 +4,7 @@ from collections.abc import Callable
from pathlib import Path
import pandas as pd
from pydantic import BaseModel
from pydantic import BaseModel, Field
class Source(BaseModel):
@@ -38,21 +38,51 @@ class Transformation(BaseModel):
extra_kwrds: dict = {}
def to_csv(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".csv")
if dest.exists():
df.to_csv(dest, mode="a", header=False, index=False)
else:
df.to_csv(dest, index=False)
return dest
def to_excel(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
if dest.exists():
raise ValueError(f"The destination exits {dest}")
else:
df.to_excel(dest)
return dest
class Destination(BaseModel):
name: str
writer: Callable = Field(to_csv)
def _write(
self,
df: pd.DataFrame,
dest_basename: Path,
writing_func: Callable | None = None,
) -> Path:
if writing_func is None:
writing_func = self.writer
return writing_func(df, dest_basename)
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
) -> list[Path]:
dest_basename = dest_path / self.name
return [writing_func(df, dest_basename)]
return [self._write(df, dest_basename, writing_func)]
class SplitDestination(Destination):
split_column: str
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
) -> list[Path]:
wrote_files = []
@@ -60,7 +90,7 @@ class SplitDestination(Destination):
filtered_df = df[df[self.split_column] == col_value]
dest_basename = dest_path / f"{self.name}-{col_value}"
dest = writing_func(filtered_df, dest_basename)
dest = self._write(filtered_df, dest_basename, writing_func)
wrote_files.append(dest)
return wrote_files
@@ -72,15 +102,6 @@ class Flux(BaseModel):
destination: Destination
def to_csv(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".csv")
if dest.exists():
df.to_csv(dest, mode="a", header=False, index=False)
else:
df.to_csv(dest, index=False)
return dest
def write_split_by(
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
) -> list[Path]:
@@ -119,26 +140,38 @@ def split_duplicates(
return no_duplicates, duplicated
def consume_flux(
name: str,
flux: Flux,
origin_path: Path,
dest_path: Path,
duplicated={},
):
logging.info(f"Consume {name}")
src_df = []
for filename, df in extract_sources(flux.sources, origin_path):
logging.info(f"Extracting {filename}")
df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df)
logging.info(f"Execute {flux.transformation.function.__name__}")
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
files = flux.destination.write(df, dest_path)
logging.info(f"{files} written")
return files
def consume_fluxes(
fluxes: dict[str, Flux],
origin_path: Path,
dest_path: Path,
writing_func=to_csv,
):
duplicated = {}
wrote_files = []
for name, flux in fluxes.items():
logging.info(f"Consume {name}")
src_df = []
for filename, df in extract_sources(flux.sources, origin_path):
logging.info(f"Extracting {filename}")
df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df)
logging.info(f"Execute {flux.transformation.function.__name__}")
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
files = flux.destination.write(df, dest_path, writing_func)
logging.info(f"{files} written")
files = consume_flux(name, flux, origin_path, dest_path, duplicated)
wrote_files += files
return wrote_files

View File

@@ -11,6 +11,7 @@ from scripts.flux import (
SplitDestination,
Transformation,
consume_fluxes,
to_excel,
)
logger = logging.getLogger(__name__)
@@ -19,9 +20,6 @@ logger.setLevel(logging.DEBUG)
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
df = pd.concat(dfs)
df = df.assign(
Impact=df["Crédit"] - df["Débit"],
)
return df
@@ -29,7 +27,9 @@ FLUXES_LOT = {
"Lots": Flux(
sources=[CSVSource(filename="CRG/crg-*.csv")],
transformation=Transformation(function=build_lots),
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
destination=SplitDestination(
name="Lot/lot", split_column="Lot", writer=to_excel
),
),
}
@@ -78,6 +78,8 @@ if __name__ == "__main__":
pnl_fluxes = {}
files = consume_fluxes(
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
fluxes=pnl_fluxes,
origin_path=gold_path,
dest_path=mart_path,
)
print(files)

View File

@@ -12,9 +12,9 @@ logger.setLevel(logging.DEBUG)
def extract_cat(cat: pd.DataFrame):
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"])
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
trans = {}
for _, (old, new) in cat_trans.iterrows():
trans[old] = new
@@ -23,9 +23,11 @@ def extract_cat(cat: pd.DataFrame):
def lot_naming(value):
if str(value).isnumeric():
return str(value).zfill(2)
return "PC"
try:
v = int(value)
except ValueError:
return "PC"
return str(v).zfill(2)
def trans_2017_2021(
@@ -99,7 +101,7 @@ def trans_2023(
df = df.assign(
Débit=df["Débit"].fillna(0),
Crédit=df["Crédit"].fillna(0),
Lot=lot_naming(df["Porte"]),
Lot=df["Porte"].apply(lot_naming),
Année=year,
)
return df[stagging_columns]
@@ -138,7 +140,7 @@ FLUXES_CRG = {
),
"2022 - charge.xlsx": Flux(
sources=[
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"),
],
transformation=Transformation(
function=trans_2022_charge,