Compare commits
22 Commits
2f77206b8f
...
main
Author | SHA1 | Date | |
---|---|---|---|
1ed6ed43ed | |||
215e26b84f | |||
b60fa3be17 | |||
a1578f813b | |||
d872cd7681 | |||
bfebd6b58a | |||
e2805f9af2 | |||
98691d5531 | |||
c6932c364b | |||
05430196d0 | |||
78576270db | |||
4cc9e7b038 | |||
dd0d8af40c | |||
fcff40adb7 | |||
dec284bde1 | |||
d0961b0909 | |||
25ede1789a | |||
9e5541a770 | |||
bd866dda36 | |||
f56edac92c | |||
3916915e22 | |||
b62ea3f5ae |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
datas/
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
|
62
Makefile
62
Makefile
@@ -1,10 +1,66 @@
|
||||
DATA_BASE=./datas
|
||||
|
||||
PDF_BASE=$(DATA_BASE)/pdfs
|
||||
PDF_YEARS=$(wildcard $(PDF_BASE)/*)
|
||||
|
||||
RAW_BASE=$(DATA_BASE)/raw
|
||||
RAW_CRG=$(RAW_BASE)/CRG
|
||||
RAW_CRG_YEARS=$(subst $(PDF_BASE), $(RAW_CRG), $(PDF_YEARS))
|
||||
|
||||
|
||||
$(RAW_CRG)/%/: $(wildcard $(PDF_BASE)/%/*)
|
||||
echo $(wildcard $(PDF_BASE)/$*/*)
|
||||
@echo ----
|
||||
ls $(PDF_BASE)/$*/
|
||||
@echo ----
|
||||
echo $*
|
||||
@echo ----
|
||||
echo $^
|
||||
@echo ----
|
||||
echo $?
|
||||
|
||||
#./datas/raw/CRG/%:
|
||||
#pdf-oralia extract all --src $$year --dest $$(subst $$PDF_BASE, $$RAW_CRG, $$year)
|
||||
# $(RAW_CRG_YEARS): $(PDF_PATHS)
|
||||
# for year in $(PDF_PATHS); do \
|
||||
# echo $$year; \
|
||||
# echo $$(subst $$PDF_BASE, $$RAW_CRG, $$year); \
|
||||
# echo "----"; \
|
||||
# done;
|
||||
|
||||
extract_pdfs:
|
||||
for year in 2021 2022 2023 2024; do \
|
||||
mkdir -p $(RAW_CRG)/$$year/extracted;\
|
||||
pdf-oralia extract all --src $(PDF_BASE)/$$year/ --dest $(RAW_CRG)/$$year/extracted; \
|
||||
pdf-oralia join --src $(RAW_CRG)/$$year/extracted/ --dest $(RAW_CRG)/$$year/; \
|
||||
done
|
||||
|
||||
clean_raw:
|
||||
rm -rf ./PLESNA Compta SYSTEM/raw/**/*.csv
|
||||
|
||||
clean_built:
|
||||
rm -rf ./PLESNA Compta SYSTEM/staging/**/*.csv
|
||||
rm -rf ./PLESNA Compta SYSTEM/gold/**/*.csv
|
||||
rm -rf ./PLESNA Compta SYSTEM/datamart/**/*.csv
|
||||
rm -rf $(DATA_BASE)/staging/**/*.csv
|
||||
rm -rf $(DATA_BASE)/gold/**/*.csv
|
||||
rm -rf $(DATA_BASE)/datamart/**/*.csv
|
||||
rm -rf $(DATA_BASE)/datamart/**/*.xlsx
|
||||
|
||||
run_ingest:
|
||||
python -m scripts ingest
|
||||
|
||||
run_feature:
|
||||
python -m scripts feature
|
||||
|
||||
run_datamart:
|
||||
python -m scripts datamart
|
||||
|
||||
build: clean_built run_ingest run_feature run_datamart
|
||||
|
||||
clean_all: clean_built clean_raw
|
||||
|
||||
import_nextcloud:
|
||||
rsync -av ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
|
||||
|
||||
push_nextcloud:
|
||||
rsync -av ./datas/datamart/ ~/Nextcloud/PLESNA\ Compta\ SYSTEM/DataMart
|
||||
|
||||
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -1,206 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "bc224455-95ed-4e33-864d-442396301cd4",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Staging vers Gold"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"id": "d5dff9f3-ec7d-4fc7-8471-5ed1fbf6cf06",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from pathlib import Path\n",
|
||||
"import pandas as pd"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"id": "4e5779f6-e0ad-46f8-b684-49af4205f084",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"staging_path = Path(\"../PLESNA Compta SYSTEM/staging\")\n",
|
||||
"assert staging_path.exists()\n",
|
||||
"gold_path = Path(\"../PLESNA Compta SYSTEM/gold\")\n",
|
||||
"assert gold_path.exists()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"id": "2074af18-4f81-49cb-9d9c-f50e7408e7fc",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def to_csv(df, dest):\n",
|
||||
" if dest.exists():\n",
|
||||
" df.to_csv(dest, mode=\"a\", header=False, index=False)\n",
|
||||
" else:\n",
|
||||
" df.to_csv(dest, index=False)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "cc74ba91-855a-41e7-8709-122425f98fb6",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### clean gold"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"id": "82de8bc5-8d1e-47fb-af28-076ed90835a9",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"for f in gold_path.glob(\"**/*.csv\"):\n",
|
||||
" f.unlink()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "539446e1-835e-4d79-a8d8-ddd5823f30f9",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## CRG"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"id": "a6423b7d-657f-4897-8dd3-fbca68318367",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2018.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2021.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2023.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2019.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/CRG/2017.csv')]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"crg_path = staging_path / \"CRG\"\n",
|
||||
"assert crg_path.exists()\n",
|
||||
"crg_files = list(crg_path.glob(\"*.csv\"))\n",
|
||||
"print(crg_files)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 6,
|
||||
"id": "edcf15c4-aa3c-40c7-805d-ae8933decf8c",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2020.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2018.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2022.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2021.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2023.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2019.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/CRG/2017.csv\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"for f in crg_files:\n",
|
||||
" df = pd.read_csv(f)\n",
|
||||
" df = df.assign(\n",
|
||||
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
|
||||
" )\n",
|
||||
" dest = gold_path / f\"CRG/{f.name}\"\n",
|
||||
" print(dest)\n",
|
||||
" to_csv(df, dest)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "811f6b89-be5a-4290-b3d5-466ec42eb3ae",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Banque"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"id": "c017b0a4-8c41-482e-85b1-4a10be84270b",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2020.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2022.csv'), PosixPath('../PLESNA Compta SYSTEM/staging/Banque/2021.csv')]\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"banque_path = staging_path / \"Banque\"\n",
|
||||
"assert banque_path.exists()\n",
|
||||
"banque_files = list(banque_path.glob(\"*.csv\"))\n",
|
||||
"print(banque_files)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"id": "b04b0d11-dd74-4463-bd6f-c59528cc080e",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2020.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2022.csv\n",
|
||||
"../PLESNA Compta SYSTEM/gold/Banque/2021.csv\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"for f in banque_files:\n",
|
||||
" df = pd.read_csv(f)\n",
|
||||
" df = df.assign(\n",
|
||||
" Impact = df[\"Crédit\"] - df[\"Débit\"],\n",
|
||||
" )\n",
|
||||
" dest = gold_path / f\"Banque/{f.name}\"\n",
|
||||
" print(dest)\n",
|
||||
" to_csv(df, dest)"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.11.6"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
7
requirements.txt
Normal file
7
requirements.txt
Normal file
@@ -0,0 +1,7 @@
|
||||
jupyter==1.0.0
|
||||
pandas==1.5.0
|
||||
pdf-oralia==0.3.11
|
||||
pydantic==2.6.1
|
||||
click==8.1.7
|
||||
dlt[duckdb]>=0.4.3a0
|
||||
openpyxl>=3.0.0
|
90
scripts/__main__.py
Normal file
90
scripts/__main__.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import logging
|
||||
from logging.config import dictConfig
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
from .flux import consume_fluxes
|
||||
|
||||
DATA_PATH = Path("datas/")
|
||||
assert DATA_PATH.exists()
|
||||
HISTORY_PATH = DATA_PATH / "Histoire"
|
||||
assert HISTORY_PATH.exists()
|
||||
STAGING_PATH = DATA_PATH / "staging"
|
||||
assert STAGING_PATH.exists()
|
||||
GOLD_PATH = DATA_PATH / "gold"
|
||||
assert GOLD_PATH.exists()
|
||||
MART_PATH = DATA_PATH / "datamart"
|
||||
assert MART_PATH.exists()
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("--debug/--no-debug", default=False)
|
||||
def main(debug):
|
||||
if debug:
|
||||
logging_level = logging.DEBUG
|
||||
else:
|
||||
logging_level = logging.INFO
|
||||
logging_config = dict(
|
||||
version=1,
|
||||
formatters={"f": {"format": "%(levelname)-8s %(name)-12s %(message)s"}},
|
||||
handlers={
|
||||
"h": {
|
||||
"class": "logging.StreamHandler",
|
||||
"formatter": "f",
|
||||
"level": logging_level,
|
||||
}
|
||||
},
|
||||
root={
|
||||
"handlers": ["h"],
|
||||
"level": logging_level,
|
||||
},
|
||||
)
|
||||
|
||||
dictConfig(logging_config)
|
||||
|
||||
|
||||
@main.command()
|
||||
def ingest():
|
||||
from .history_stagging import FLUXES_CRG
|
||||
|
||||
history_crg_path = HISTORY_PATH / "CRG"
|
||||
assert history_crg_path.exists()
|
||||
staging_crg_path = STAGING_PATH / "CRG"
|
||||
assert staging_crg_path.exists()
|
||||
consume_fluxes(
|
||||
fluxes=FLUXES_CRG,
|
||||
origin_path=history_crg_path,
|
||||
dest_path=staging_crg_path,
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
def feature():
|
||||
from .stagging_gold import FLUXES_CRG
|
||||
|
||||
staging_crg_path = STAGING_PATH / "CRG"
|
||||
assert staging_crg_path.exists()
|
||||
gold_crg_path = GOLD_PATH / "CRG"
|
||||
assert gold_crg_path.exists()
|
||||
|
||||
consume_fluxes(
|
||||
fluxes=FLUXES_CRG(staging_crg_path),
|
||||
origin_path=staging_crg_path,
|
||||
dest_path=gold_crg_path,
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
def datamart():
|
||||
from .gold_mart import FLUXES_LOT
|
||||
|
||||
consume_fluxes(
|
||||
fluxes=FLUXES_LOT,
|
||||
origin_path=GOLD_PATH,
|
||||
dest_path=MART_PATH,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
177
scripts/flux.py
Normal file
177
scripts/flux.py
Normal file
@@ -0,0 +1,177 @@
|
||||
import logging
|
||||
from abc import abstractmethod
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class Source(BaseModel):
|
||||
filename: str
|
||||
|
||||
@abstractmethod
|
||||
def get_df(self) -> pd.DataFrame:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class ExcelSource(Source):
|
||||
sheet_name: str
|
||||
|
||||
def get_df(self, base_path: Path) -> pd.DataFrame:
|
||||
filepath = base_path / self.filename
|
||||
logging.debug(f"Get content of {filepath}")
|
||||
return pd.read_excel(filepath, sheet_name=self.sheet_name)
|
||||
|
||||
|
||||
class CSVSource(Source):
|
||||
options: dict = {}
|
||||
|
||||
def get_df(self, base_path: Path) -> pd.DataFrame:
|
||||
filepath = base_path / self.filename
|
||||
logging.debug(f"Get content of {filepath}")
|
||||
return pd.read_csv(filepath, **self.options)
|
||||
|
||||
|
||||
class Transformation(BaseModel):
|
||||
function: Callable
|
||||
extra_kwrds: dict = {}
|
||||
|
||||
|
||||
def to_csv(df, dest_basename: Path) -> Path:
|
||||
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
||||
if dest.exists():
|
||||
df.to_csv(dest, mode="a", header=False, index=False)
|
||||
else:
|
||||
df.to_csv(dest, index=False)
|
||||
return dest
|
||||
|
||||
|
||||
def to_excel(df, dest_basename: Path) -> Path:
|
||||
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
|
||||
if dest.exists():
|
||||
raise ValueError(f"The destination exits {dest}")
|
||||
else:
|
||||
df.to_excel(dest)
|
||||
return dest
|
||||
|
||||
|
||||
class Destination(BaseModel):
|
||||
name: str
|
||||
writer: Callable = Field(to_csv)
|
||||
|
||||
def _write(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
dest_basename: Path,
|
||||
writing_func: Callable | None = None,
|
||||
) -> Path:
|
||||
if writing_func is None:
|
||||
writing_func = self.writer
|
||||
|
||||
return writing_func(df, dest_basename)
|
||||
|
||||
def write(
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
||||
) -> list[Path]:
|
||||
dest_basename = dest_path / self.name
|
||||
return [self._write(df, dest_basename, writing_func)]
|
||||
|
||||
|
||||
class SplitDestination(Destination):
|
||||
split_column: str
|
||||
|
||||
def write(
|
||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
||||
) -> list[Path]:
|
||||
wrote_files = []
|
||||
|
||||
for col_value in df[self.split_column].unique():
|
||||
filtered_df = df[df[self.split_column] == col_value]
|
||||
|
||||
dest_basename = dest_path / f"{self.name}-{col_value}"
|
||||
dest = self._write(filtered_df, dest_basename, writing_func)
|
||||
wrote_files.append(dest)
|
||||
|
||||
return wrote_files
|
||||
|
||||
|
||||
class Flux(BaseModel):
|
||||
sources: list[Source]
|
||||
transformation: Transformation
|
||||
destination: Destination
|
||||
|
||||
|
||||
def write_split_by(
|
||||
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
|
||||
) -> list[Path]:
|
||||
wrote_files = []
|
||||
|
||||
for col_value in df[column].unique():
|
||||
filtered_df = df[df[column] == col_value]
|
||||
|
||||
dest_basename = dest_path / f"{name}-{col_value}"
|
||||
dest = writing_func(filtered_df, dest_basename)
|
||||
wrote_files.append(dest)
|
||||
|
||||
return wrote_files
|
||||
|
||||
|
||||
def extract_sources(sources: list[Source], base_path: Path = Path()):
|
||||
for src in sources:
|
||||
if "*" in src.filename:
|
||||
expanded_src = [
|
||||
src.model_copy(update={"filename": str(p.relative_to(base_path))})
|
||||
for p in base_path.glob(src.filename)
|
||||
]
|
||||
yield from extract_sources(expanded_src, base_path)
|
||||
else:
|
||||
filepath = base_path / src.filename
|
||||
assert filepath.exists
|
||||
yield src.filename, src.get_df(base_path)
|
||||
|
||||
|
||||
def split_duplicates(
|
||||
df, origin: str, duplicated: dict[str, pd.DataFrame]
|
||||
) -> [pd.DataFrame, dict[str, pd.DataFrame]]:
|
||||
duplicates = df.duplicated()
|
||||
no_duplicates = df[~duplicates]
|
||||
duplicated[origin] = df[duplicates]
|
||||
return no_duplicates, duplicated
|
||||
|
||||
|
||||
def consume_flux(
|
||||
name: str,
|
||||
flux: Flux,
|
||||
origin_path: Path,
|
||||
dest_path: Path,
|
||||
duplicated={},
|
||||
):
|
||||
logging.info(f"Consume {name}")
|
||||
src_df = []
|
||||
for filename, df in extract_sources(flux.sources, origin_path):
|
||||
logging.info(f"Extracting {filename}")
|
||||
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||
src_df.append(df)
|
||||
|
||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||
|
||||
files = flux.destination.write(df, dest_path)
|
||||
|
||||
logging.info(f"{files} written")
|
||||
return files
|
||||
|
||||
|
||||
def consume_fluxes(
|
||||
fluxes: dict[str, Flux],
|
||||
origin_path: Path,
|
||||
dest_path: Path,
|
||||
):
|
||||
duplicated = {}
|
||||
wrote_files = []
|
||||
|
||||
for name, flux in fluxes.items():
|
||||
files = consume_flux(name, flux, origin_path, dest_path, duplicated)
|
||||
wrote_files += files
|
||||
return wrote_files
|
85
scripts/gold_mart.py
Normal file
85
scripts/gold_mart.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from scripts.flux import (
|
||||
CSVSource,
|
||||
Destination,
|
||||
Flux,
|
||||
SplitDestination,
|
||||
Transformation,
|
||||
consume_fluxes,
|
||||
to_excel,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
||||
df = pd.concat(dfs)
|
||||
return df
|
||||
|
||||
|
||||
FLUXES_LOT = {
|
||||
"Lots": Flux(
|
||||
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||
transformation=Transformation(function=build_lots),
|
||||
destination=SplitDestination(
|
||||
name="Lot/lot", split_column="Lot", writer=to_excel
|
||||
),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame:
|
||||
df = pd.concat(dfs)
|
||||
df = df[df["Année"] == year]
|
||||
pt = df.groupby(["Catégorie", "Mois"]).agg("sum").unstack().fillna(0)
|
||||
pt.columns = [c[1] for c in pt.columns]
|
||||
pt.reset_index(["Catégorie"])
|
||||
return pt
|
||||
|
||||
|
||||
def build_pnl_flux(year: int) -> Flux:
|
||||
return Flux(
|
||||
sources=[
|
||||
CSVSource(filename=f"CRG/crg-{year}.csv"),
|
||||
CSVSource(filename=f"banque/banque-{year}.csv"),
|
||||
],
|
||||
transformation=Transformation(
|
||||
function=build_pnl,
|
||||
extra_kwrds={"year": year},
|
||||
),
|
||||
destination=Destination(name=f"pnl/{year}"),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
data_path = Path("datas/")
|
||||
assert data_path.exists()
|
||||
|
||||
gold_path = data_path / "gold"
|
||||
assert gold_path.exists()
|
||||
gold_crg_path = gold_path / "CRG"
|
||||
assert gold_crg_path.exists()
|
||||
|
||||
mart_path = data_path / "datamart"
|
||||
assert mart_path.exists()
|
||||
|
||||
files = consume_fluxes(
|
||||
fluxes=FLUXES_LOT, origin_path=gold_path, dest_path=mart_path
|
||||
)
|
||||
|
||||
years = list(range(2017, 2024))
|
||||
# pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years}
|
||||
pnl_fluxes = {}
|
||||
|
||||
files = consume_fluxes(
|
||||
fluxes=pnl_fluxes,
|
||||
origin_path=gold_path,
|
||||
dest_path=mart_path,
|
||||
)
|
||||
print(files)
|
194
scripts/history_stagging.py
Normal file
194
scripts/history_stagging.py
Normal file
@@ -0,0 +1,194 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from scripts.flux import consume_fluxes
|
||||
|
||||
from .flux import Destination, ExcelSource, Flux, SplitDestination, Transformation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def extract_cat(cat: pd.DataFrame):
|
||||
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"])
|
||||
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
||||
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
|
||||
trans = {}
|
||||
for _, (old, new) in cat_trans.iterrows():
|
||||
trans[old] = new
|
||||
|
||||
return trans, cat_drop
|
||||
|
||||
|
||||
def lot_naming(value):
|
||||
try:
|
||||
v = int(value)
|
||||
except ValueError:
|
||||
return "PC"
|
||||
return str(v).zfill(2)
|
||||
|
||||
|
||||
def trans_2017_2021(
|
||||
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
|
||||
) -> pd.DataFrame:
|
||||
df, cat = dfs
|
||||
cat_trans, cat_drop = extract_cat(cat)
|
||||
|
||||
df = df[~df["Categorie"].isin(cat_drop)]
|
||||
|
||||
df = df.assign(
|
||||
Immeuble=df["immeuble"],
|
||||
Porte=df["porte"],
|
||||
Débit=df["Débit"].fillna(0),
|
||||
Crédit=df["Crédit"].fillna(0),
|
||||
Lot=df["porte"].apply(lot_naming),
|
||||
Année=df["Date"].astype(str).str.slice(0, 4),
|
||||
Mois=df["Date"].astype(str).str.slice(5, 7),
|
||||
Catégorie=df["Categorie"].replace(cat_trans),
|
||||
Fournisseur="",
|
||||
)
|
||||
|
||||
return df[stagging_columns]
|
||||
|
||||
|
||||
def trans_2022_charge(
|
||||
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
|
||||
) -> pd.DataFrame:
|
||||
df = dfs[0]
|
||||
df = df.assign(
|
||||
Immeuble=df["immeuble"],
|
||||
Porte=df["lot"],
|
||||
Débit=df["Débits"].fillna(0),
|
||||
Crédit=df["Crédits"].fillna(0),
|
||||
Lot=df["lot"].apply(lot_naming),
|
||||
Année=df["annee"],
|
||||
Mois=df["mois"],
|
||||
Catégorie=df["Catégorie Charge"],
|
||||
# Catégorie=df["Catégorie Charge"].replace(trans),
|
||||
Fournisseur="",
|
||||
Régie="Oralia - Gelas",
|
||||
Libellé="",
|
||||
)
|
||||
return df[stagging_columns]
|
||||
|
||||
|
||||
def trans_2022_loc(
|
||||
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
|
||||
) -> pd.DataFrame:
|
||||
df = dfs[0]
|
||||
df = df.assign(
|
||||
Immeuble=df["immeuble"],
|
||||
Porte=df["lot"],
|
||||
Débit=0,
|
||||
Crédit=df["Réglés"].fillna(0),
|
||||
Lot=df["lot"].apply(lot_naming),
|
||||
Année=df["annee"],
|
||||
Mois=df["mois"],
|
||||
Catégorie="Loyer Charge",
|
||||
Fournisseur="",
|
||||
Régie="Oralia - Gelas",
|
||||
Libellé="",
|
||||
)
|
||||
return df[stagging_columns]
|
||||
|
||||
|
||||
def trans_2023(
|
||||
dfs: list[pd.DataFrame], year: str, stagging_columns: list[str], **kwrds
|
||||
) -> pd.DataFrame:
|
||||
df = dfs[0]
|
||||
df = df.assign(
|
||||
Débit=df["Débit"].fillna(0),
|
||||
Crédit=df["Crédit"].fillna(0),
|
||||
Lot=df["Porte"].apply(lot_naming),
|
||||
Année=year,
|
||||
)
|
||||
return df[stagging_columns]
|
||||
|
||||
|
||||
STAGGING_COLUMNS = [
|
||||
"Régie",
|
||||
"Immeuble",
|
||||
"Porte",
|
||||
"Lot",
|
||||
"Année",
|
||||
"Mois",
|
||||
"Catégorie",
|
||||
"Fournisseur",
|
||||
"Libellé",
|
||||
"Débit",
|
||||
"Crédit",
|
||||
]
|
||||
|
||||
FLUXES_CRG = {
|
||||
"2017 2021 - charge et locataire.xlsx": Flux(
|
||||
sources=[
|
||||
ExcelSource(
|
||||
filename="2017 2021 - charge et locataire.xlsx", sheet_name="DB CRG"
|
||||
),
|
||||
ExcelSource(
|
||||
filename="2017 2021 - charge et locataire.xlsx",
|
||||
sheet_name="Catégories",
|
||||
),
|
||||
],
|
||||
transformation=Transformation(
|
||||
function=trans_2017_2021,
|
||||
extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
|
||||
),
|
||||
destination=SplitDestination(name="crg", split_column="Année"),
|
||||
),
|
||||
"2022 - charge.xlsx": Flux(
|
||||
sources=[
|
||||
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"),
|
||||
],
|
||||
transformation=Transformation(
|
||||
function=trans_2022_charge,
|
||||
extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
|
||||
),
|
||||
destination=SplitDestination(name="crg", split_column="Année"),
|
||||
),
|
||||
"2022 - locataire.xlsx": Flux(
|
||||
sources=[
|
||||
ExcelSource(filename="2022 - locataire.xlsx", sheet_name="Sheet1"),
|
||||
],
|
||||
transformation=Transformation(
|
||||
function=trans_2022_loc,
|
||||
extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
|
||||
),
|
||||
destination=SplitDestination(name="crg", split_column="Année"),
|
||||
),
|
||||
"2023 - charge et locataire.xlsx": Flux(
|
||||
sources=[
|
||||
ExcelSource(
|
||||
filename="2023 - charge et locataire.xlsx",
|
||||
sheet_name="DB CRG 2023 ...",
|
||||
),
|
||||
],
|
||||
transformation=Transformation(
|
||||
function=trans_2023,
|
||||
extra_kwrds={"year": 2023, "stagging_columns": STAGGING_COLUMNS},
|
||||
),
|
||||
destination=SplitDestination(name="crg", split_column="Année"),
|
||||
),
|
||||
}
|
||||
|
||||
if __name__ == "__main__":
|
||||
data_path = Path("datas/")
|
||||
assert data_path.exists()
|
||||
history_path = data_path / "Histoire"
|
||||
assert history_path.exists()
|
||||
history_crg_path = history_path / "CRG"
|
||||
assert history_crg_path.exists()
|
||||
|
||||
staging_path = data_path / "staging"
|
||||
assert staging_path.exists()
|
||||
staging_crg_path = staging_path / "CRG"
|
||||
assert staging_crg_path.exists()
|
||||
|
||||
crg_files = consume_fluxes(
|
||||
fluxes=FLUXES_CRG,
|
||||
origin_path=history_crg_path,
|
||||
dest_path=staging_crg_path,
|
||||
)
|
||||
print(crg_files)
|
@@ -4,7 +4,7 @@ import pandas as pd
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
|
||||
class Interseptor:
|
||||
class ValidationInterseptor:
|
||||
def __init__(self, model: BaseModel):
|
||||
self.model = model
|
||||
self.not_valid_rows = []
|
||||
@@ -18,8 +18,10 @@ class Interseptor:
|
||||
try:
|
||||
self.model(**r)
|
||||
except ValidationError:
|
||||
r["InterseptorOrigin"] = func.__name__
|
||||
r["InterseptorIndex"] = i
|
||||
r["ValidationInterseptorFunc"] = func.__name__
|
||||
r["ValidationInterseptorArgs"] = args
|
||||
r["ValidationInterseptorKwrds"] = kwrds
|
||||
r["ValidationInterseptorIndex"] = i
|
||||
self.not_valid_rows.append(r)
|
||||
else:
|
||||
valid_rows.append(r)
|
||||
|
76
scripts/stagging_gold.py
Normal file
76
scripts/stagging_gold.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from scripts.flux import CSVSource, Destination, Flux, Transformation, consume_fluxes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def feature_crg(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
||||
df = dfs[0]
|
||||
df = df.assign(
|
||||
Impact=df["Crédit"] - df["Débit"],
|
||||
Lot=df["Immeuble"].astype(str) + df["Lot"].astype("str"),
|
||||
)
|
||||
return df
|
||||
|
||||
|
||||
GOLD_COLUMNS = [
|
||||
"Régie",
|
||||
"Immeuble",
|
||||
"Porte",
|
||||
"Lot",
|
||||
"Année",
|
||||
"Mois",
|
||||
"Catégorie",
|
||||
"Fournisseur",
|
||||
"Libellé",
|
||||
"Débit",
|
||||
"Crédit",
|
||||
"Impact",
|
||||
]
|
||||
|
||||
|
||||
def build_crg_fluxes(
|
||||
crg_path: Path, pattern: str, transformation: Callable, csv_options: dict = {}
|
||||
) -> dict[str, Flux]:
|
||||
fluxes = {}
|
||||
for file in crg_path.glob(pattern):
|
||||
fluxes[f"CRG - {file.name}"] = Flux(
|
||||
sources=[CSVSource(filename=file.name, options=csv_options)],
|
||||
transformation=Transformation(function=transformation),
|
||||
destination=Destination(name=file.name),
|
||||
)
|
||||
return fluxes
|
||||
|
||||
|
||||
def FLUXES_CRG(staging_crg_path: Path):
|
||||
return build_crg_fluxes(
|
||||
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
data_path = Path("datas/")
|
||||
assert data_path.exists()
|
||||
|
||||
staging_path = data_path / "staging"
|
||||
assert staging_path.exists()
|
||||
staging_crg_path = staging_path / "CRG"
|
||||
assert staging_crg_path.exists()
|
||||
|
||||
gold_path = data_path / "gold"
|
||||
assert gold_path.exists()
|
||||
gold_crg_path = gold_path / "CRG"
|
||||
assert gold_crg_path.exists()
|
||||
|
||||
crg_files = consume_fluxes(
|
||||
fluxes=FLUXES_CRG(staging_crg_path),
|
||||
origin_path=staging_crg_path,
|
||||
dest_path=gold_crg_path,
|
||||
)
|
||||
print(crg_files)
|
@@ -4,7 +4,7 @@ import pandas as pd
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
from scripts.intersept_not_valid import Interseptor
|
||||
from scripts.intersept_not_valid import ValidationInterseptor
|
||||
|
||||
|
||||
class FakeModel(BaseModel):
|
||||
@@ -13,7 +13,7 @@ class FakeModel(BaseModel):
|
||||
|
||||
|
||||
def test_init_composed():
|
||||
interceptor = Interseptor(FakeModel)
|
||||
interceptor = ValidationInterseptor(FakeModel)
|
||||
|
||||
def df_generator(nrows=3):
|
||||
records = [{"name": "plop", "age": random.randint(1, 50)} for _ in range(nrows)]
|
||||
@@ -27,7 +27,7 @@ def test_init_composed():
|
||||
|
||||
|
||||
def test_init_decorator():
|
||||
interceptor = Interseptor(FakeModel)
|
||||
interceptor = ValidationInterseptor(FakeModel)
|
||||
|
||||
@interceptor
|
||||
def df_generator(nrows=3):
|
||||
@@ -40,7 +40,7 @@ def test_init_decorator():
|
||||
|
||||
|
||||
def test_intersept_not_valid():
|
||||
interceptor = Interseptor(FakeModel)
|
||||
interceptor = ValidationInterseptor(FakeModel)
|
||||
|
||||
@interceptor
|
||||
def df_generator():
|
||||
@@ -57,7 +57,9 @@ def test_intersept_not_valid():
|
||||
{
|
||||
"name": "hop",
|
||||
"age": "ui",
|
||||
"InterseptorOrigin": "df_generator",
|
||||
"InterseptorIndex": 1,
|
||||
"ValidationInterseptorFunc": "df_generator",
|
||||
"ValidationInterseptorArgs": (),
|
||||
"ValidationInterseptorKwrds": {},
|
||||
"ValidationInterseptorIndex": 1,
|
||||
}
|
||||
]
|
||||
|
Reference in New Issue
Block a user