diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..65a5a79 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +jupyter==1.0.0 +pandas==1.5.0 +pdf-oralia==0.3.11 +pydantic==2.6.1 diff --git a/scripts/history_stagging.py b/scripts/history_stagging.py new file mode 100644 index 0000000..b1fc422 --- /dev/null +++ b/scripts/history_stagging.py @@ -0,0 +1,206 @@ +import logging +from collections.abc import Callable +from pathlib import Path + +import pandas as pd +from pydantic import BaseModel + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +class Source(BaseModel): + filename: str + sheet_name: str = "" + + +class Flux(BaseModel): + sources: list[Source] + transformation: Callable + extra_kwrds: dict = {} + + +def to_csv(df, dest_basename): + dest = dest_basename.parent / (dest_basename.name + ".csv") + if dest.exists(): + df.to_csv(dest, mode="a", header=False, index=False) + else: + df.to_csv(dest, index=False) + return dest + + +def write_split_by( + df: pd.DataFrame, column: str, dest_path: Path, writing_func +) -> list[Path]: + wrote_files = [] + + for col_value in df[column].unique(): + filtered_df = df[df[column] == col_value] + + dest_basename = dest_path / f"{col_value}" + dest = writing_func(filtered_df, dest_basename) + wrote_files.append(dest) + + return wrote_files + + +def extract_sources(sources: list[Source], base_path: Path = Path()): + for src in sources: + filepath = base_path / src.filename + assert filepath.exists + yield src.filename, pd.read_excel(filepath, sheet_name=src.sheet_name) + + +def split_duplicates( + df, origin: str, duplicated: dict[str, pd.DataFrame] +) -> [pd.DataFrame, dict[str, pd.DataFrame]]: + duplicates = df.duplicated() + no_duplicates = df[~duplicates] + duplicated[origin] = df[duplicates] + return no_duplicates, duplicated + + +def crg(history_path: Path, staging_path: Path, metadatas: dict, writing_func=to_csv): + duplicated = {} + wrote_files = [] + + for name, metadata in metadatas.items(): + logger.debug(f"Processing {name}") + src_df = [] + for filename, df in extract_sources(metadata.sources, history_path): + df, duplicated = split_duplicates(df, str(filename), duplicated) + src_df.append(df) + + df = metadata.transformation(src_df, **metadata.extra_kwrds) + + files = write_split_by(df, "Année", staging_path, writing_func) + wrote_files += files + return wrote_files + + +def trans_2017_2021(dfs, **kwrds): + df, cat = dfs + cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"]) + cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"] + + trans = {} + for _, (old, new) in cat_trans.iterrows(): + trans[old] = new + + df = df[~df["Categorie"].isin(cat_drop)] + + df = df.assign( + Immeuble=df["immeuble"], + Porte=df["porte"], + Débit=df["Débit"].fillna(0), + Crédit=df["Crédit"].fillna(0), + Lot=df["immeuble"].astype(str) + df["porte"].astype("str").str.zfill(2), + Année=df["Date"].astype(str).str.slice(0, 4), + Mois=df["Date"].astype(str).str.slice(5, 7), + Catégorie=df["Categorie"].replace(trans), + Fournisseur="", + ) + + return df + + +def trans_2022_charge(dfs, **kwrds): + df = dfs[0] + df = df.assign( + Immeuble=df["immeuble"], + Porte=df["lot"], + Débit=df["Débits"].fillna(0), + Crédit=df["Crédits"].fillna(0), + Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2), + Année=df["annee"], + Mois=df["mois"], + Catégorie=df["Catégorie Charge"], + # Catégorie=df["Catégorie Charge"].replace(trans), + Fournisseur="", + Régie="Oralia - Gelas", + Libellé="", + ) + return df + + +def trans_2022_loc(dfs, **kwrds): + df = dfs[0] + df = df.assign( + Immeuble=df["immeuble"], + Porte=df["lot"], + Débit=0, + Crédit=df["Réglés"].fillna(0), + Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2), + Année=df["annee"], + Mois=df["mois"], + Catégorie="Loyer Charge", + Fournisseur="", + Régie="Oralia - Gelas", + Libellé="", + ) + return df + + +def trans_2023(dfs, year, **kwrds): + df = dfs[0] + df = df.assign( + Débit=df["Débit"].fillna(0), + Crédit=df["Crédit"].fillna(0), + Lot=df["Immeuble"].astype(str) + df["Porte"].astype("str").str.zfill(2), + Année=year, + ) + return df + + +METADATAS = { + "2017 2021 - charge et locataire.xlsx": Flux( + sources=[ + Source( + filename="2017 2021 - charge et locataire.xlsx", sheet_name="DB CRG" + ), + Source( + filename="2017 2021 - charge et locataire.xlsx", + sheet_name="Catégories", + ), + ], + transformation=trans_2017_2021, + ), + "2022 - charge.xlsx": Flux( + sources=[ + Source(filename="2022 - charge.xlsx", sheet_name="Sheet1"), + ], + transformation=trans_2022_charge, + ), + "2022 - locataire.xlsx": Flux( + sources=[ + Source(filename="2022 - locataire.xlsx", sheet_name="Sheet1"), + ], + transformation=trans_2022_loc, + ), + "2023 - charge et locataire.xlsx": Flux( + sources=[ + Source( + filename="2023 - charge et locataire.xlsx", + sheet_name="DB CRG 2023 ...", + ), + ], + transformation=trans_2023, + extra_kwrds={"year": 2023}, + ), +} + +if __name__ == "__main__": + data_path = Path("datas/") + assert data_path.exists() + history_path = data_path / "Histoire" + assert history_path.exists() + history_crg_path = history_path / "CRG" + assert history_crg_path.exists() + + staging_path = data_path / "staging" + assert staging_path.exists() + staging_crg_path = staging_path / "CRG" + assert staging_crg_path.exists() + + crg_files = crg(history_crg_path, staging_crg_path, METADATAS) + print(crg_files)