Feat: add history_stagging

This commit is contained in:
Bertrand Benjamin 2024-03-02 18:18:06 +01:00
parent 3916915e22
commit f56edac92c
2 changed files with 210 additions and 0 deletions

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
jupyter==1.0.0
pandas==1.5.0
pdf-oralia==0.3.11
pydantic==2.6.1

206
scripts/history_stagging.py Normal file
View File

@ -0,0 +1,206 @@
import logging
from collections.abc import Callable
from pathlib import Path
import pandas as pd
from pydantic import BaseModel
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class Source(BaseModel):
filename: str
sheet_name: str = ""
class Flux(BaseModel):
sources: list[Source]
transformation: Callable
extra_kwrds: dict = {}
def to_csv(df, dest_basename):
dest = dest_basename.parent / (dest_basename.name + ".csv")
if dest.exists():
df.to_csv(dest, mode="a", header=False, index=False)
else:
df.to_csv(dest, index=False)
return dest
def write_split_by(
df: pd.DataFrame, column: str, dest_path: Path, writing_func
) -> list[Path]:
wrote_files = []
for col_value in df[column].unique():
filtered_df = df[df[column] == col_value]
dest_basename = dest_path / f"{col_value}"
dest = writing_func(filtered_df, dest_basename)
wrote_files.append(dest)
return wrote_files
def extract_sources(sources: list[Source], base_path: Path = Path()):
for src in sources:
filepath = base_path / src.filename
assert filepath.exists
yield src.filename, pd.read_excel(filepath, sheet_name=src.sheet_name)
def split_duplicates(
df, origin: str, duplicated: dict[str, pd.DataFrame]
) -> [pd.DataFrame, dict[str, pd.DataFrame]]:
duplicates = df.duplicated()
no_duplicates = df[~duplicates]
duplicated[origin] = df[duplicates]
return no_duplicates, duplicated
def crg(history_path: Path, staging_path: Path, metadatas: dict, writing_func=to_csv):
duplicated = {}
wrote_files = []
for name, metadata in metadatas.items():
logger.debug(f"Processing {name}")
src_df = []
for filename, df in extract_sources(metadata.sources, history_path):
df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df)
df = metadata.transformation(src_df, **metadata.extra_kwrds)
files = write_split_by(df, "Année", staging_path, writing_func)
wrote_files += files
return wrote_files
def trans_2017_2021(dfs, **kwrds):
df, cat = dfs
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
trans = {}
for _, (old, new) in cat_trans.iterrows():
trans[old] = new
df = df[~df["Categorie"].isin(cat_drop)]
df = df.assign(
Immeuble=df["immeuble"],
Porte=df["porte"],
Débit=df["Débit"].fillna(0),
Crédit=df["Crédit"].fillna(0),
Lot=df["immeuble"].astype(str) + df["porte"].astype("str").str.zfill(2),
Année=df["Date"].astype(str).str.slice(0, 4),
Mois=df["Date"].astype(str).str.slice(5, 7),
Catégorie=df["Categorie"].replace(trans),
Fournisseur="",
)
return df
def trans_2022_charge(dfs, **kwrds):
df = dfs[0]
df = df.assign(
Immeuble=df["immeuble"],
Porte=df["lot"],
Débit=df["Débits"].fillna(0),
Crédit=df["Crédits"].fillna(0),
Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
Année=df["annee"],
Mois=df["mois"],
Catégorie=df["Catégorie Charge"],
# Catégorie=df["Catégorie Charge"].replace(trans),
Fournisseur="",
Régie="Oralia - Gelas",
Libellé="",
)
return df
def trans_2022_loc(dfs, **kwrds):
df = dfs[0]
df = df.assign(
Immeuble=df["immeuble"],
Porte=df["lot"],
Débit=0,
Crédit=df["Réglés"].fillna(0),
Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
Année=df["annee"],
Mois=df["mois"],
Catégorie="Loyer Charge",
Fournisseur="",
Régie="Oralia - Gelas",
Libellé="",
)
return df
def trans_2023(dfs, year, **kwrds):
df = dfs[0]
df = df.assign(
Débit=df["Débit"].fillna(0),
Crédit=df["Crédit"].fillna(0),
Lot=df["Immeuble"].astype(str) + df["Porte"].astype("str").str.zfill(2),
Année=year,
)
return df
METADATAS = {
"2017 2021 - charge et locataire.xlsx": Flux(
sources=[
Source(
filename="2017 2021 - charge et locataire.xlsx", sheet_name="DB CRG"
),
Source(
filename="2017 2021 - charge et locataire.xlsx",
sheet_name="Catégories",
),
],
transformation=trans_2017_2021,
),
"2022 - charge.xlsx": Flux(
sources=[
Source(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
],
transformation=trans_2022_charge,
),
"2022 - locataire.xlsx": Flux(
sources=[
Source(filename="2022 - locataire.xlsx", sheet_name="Sheet1"),
],
transformation=trans_2022_loc,
),
"2023 - charge et locataire.xlsx": Flux(
sources=[
Source(
filename="2023 - charge et locataire.xlsx",
sheet_name="DB CRG 2023 ...",
),
],
transformation=trans_2023,
extra_kwrds={"year": 2023},
),
}
if __name__ == "__main__":
data_path = Path("datas/")
assert data_path.exists()
history_path = data_path / "Histoire"
assert history_path.exists()
history_crg_path = history_path / "CRG"
assert history_crg_path.exists()
staging_path = data_path / "staging"
assert staging_path.exists()
staging_crg_path = staging_path / "CRG"
assert staging_crg_path.exists()
crg_files = crg(history_crg_path, staging_crg_path, METADATAS)
print(crg_files)