From 25ede1789a1fb1728ccc25c09bb1bd05f79fa8d8 Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Sun, 3 Mar 2024 06:39:27 +0100 Subject: [PATCH] Feat: convert stagging2gold --- scripts/flux.py | 109 +++++++++++++++++++++++++++++ scripts/history_stagging.py | 134 +++++++++++++----------------------- scripts/stagging_gold.py | 69 +++++++++++++++++++ 3 files changed, 224 insertions(+), 88 deletions(-) create mode 100644 scripts/flux.py create mode 100644 scripts/stagging_gold.py diff --git a/scripts/flux.py b/scripts/flux.py new file mode 100644 index 0000000..037eac5 --- /dev/null +++ b/scripts/flux.py @@ -0,0 +1,109 @@ +import logging +from abc import abstractmethod +from collections.abc import Callable +from pathlib import Path + +import pandas as pd +from pydantic import BaseModel + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +class Source(BaseModel): + filename: str + + @abstractmethod + def get_df(self) -> pd.DataFrame: + raise NotImplementedError + + +class ExcelSource(Source): + sheet_name: str + + def get_df(self, base_path: Path) -> pd.DataFrame: + filepath = base_path / self.filename + logger.debug(f"Get content of {filepath}") + return pd.read_excel(filepath, sheet_name=self.sheet_name) + + +class CSVSource(Source): + options: dict + + def get_df(self, base_path: Path) -> pd.DataFrame: + filepath = base_path / self.filename + logger.debug(f"Get content of {filepath}") + return pd.read_csv(filepath, **self.options) + + +class Flux(BaseModel): + sources: list[Source] + transformation: Callable + extra_kwrds: dict = {} + + +def to_csv(df, dest_basename): + dest = dest_basename.parent / (dest_basename.name + ".csv") + if dest.exists(): + df.to_csv(dest, mode="a", header=False, index=False) + else: + df.to_csv(dest, index=False) + return dest + + +def write_split_by( + df: pd.DataFrame, column: str, dest_path: Path, writing_func +) -> list[Path]: + wrote_files = [] + + for col_value in df[column].unique(): + filtered_df = df[df[column] == col_value] + + dest_basename = dest_path / f"{col_value}" + dest = writing_func(filtered_df, dest_basename) + wrote_files.append(dest) + + return wrote_files + + +def extract_sources(sources: list[Source], base_path: Path = Path()): + for src in sources: + if "*" in src.filename: + expanded_src = [ + src.model_copy(update={"filename": p}) for p in Path.glob(src.filename) + ] + yield from extract_sources(expanded_src, base_path) + else: + filepath = base_path / src.filename + assert filepath.exists + yield src.filename, src.get_df(base_path) + + +def split_duplicates( + df, origin: str, duplicated: dict[str, pd.DataFrame] +) -> [pd.DataFrame, dict[str, pd.DataFrame]]: + duplicates = df.duplicated() + no_duplicates = df[~duplicates] + duplicated[origin] = df[duplicates] + return no_duplicates, duplicated + + +def consume_fluxes( + fluxes: dict[str, Flux], origin_path: Path, dest_path: Path, writing_func=to_csv +): + duplicated = {} + wrote_files = [] + + for name, flux in fluxes.items(): + print(name) + logger.info(f"Processing flux {name}") + src_df = [] + for filename, df in extract_sources(flux.sources, origin_path): + df, duplicated = split_duplicates(df, str(filename), duplicated) + src_df.append(df) + + df = flux.transformation(src_df, **flux.extra_kwrds) + + files = write_split_by(df, "Année", dest_path, writing_func) + wrote_files += files + return wrote_files diff --git a/scripts/history_stagging.py b/scripts/history_stagging.py index 77d88a9..4dc8287 100644 --- a/scripts/history_stagging.py +++ b/scripts/history_stagging.py @@ -1,84 +1,17 @@ import logging -from collections.abc import Callable from pathlib import Path import pandas as pd -from pydantic import BaseModel + +from scripts.flux import consume_fluxes + +from .flux import ExcelSource, Flux logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -class Source(BaseModel): - filename: str - sheet_name: str = "" - - -class Flux(BaseModel): - sources: list[Source] - transformation: Callable - extra_kwrds: dict = {} - - -def to_csv(df, dest_basename): - dest = dest_basename.parent / (dest_basename.name + ".csv") - if dest.exists(): - df.to_csv(dest, mode="a", header=False, index=False) - else: - df.to_csv(dest, index=False) - return dest - - -def write_split_by( - df: pd.DataFrame, column: str, dest_path: Path, writing_func -) -> list[Path]: - wrote_files = [] - - for col_value in df[column].unique(): - filtered_df = df[df[column] == col_value] - - dest_basename = dest_path / f"{col_value}" - dest = writing_func(filtered_df, dest_basename) - wrote_files.append(dest) - - return wrote_files - - -def extract_sources(sources: list[Source], base_path: Path = Path()): - for src in sources: - filepath = base_path / src.filename - assert filepath.exists - yield src.filename, pd.read_excel(filepath, sheet_name=src.sheet_name) - - -def split_duplicates( - df, origin: str, duplicated: dict[str, pd.DataFrame] -) -> [pd.DataFrame, dict[str, pd.DataFrame]]: - duplicates = df.duplicated() - no_duplicates = df[~duplicates] - duplicated[origin] = df[duplicates] - return no_duplicates, duplicated - - -def crg(history_path: Path, staging_path: Path, metadatas: dict, writing_func=to_csv): - duplicated = {} - wrote_files = [] - - for name, metadata in metadatas.items(): - logger.debug(f"Processing {name}") - src_df = [] - for filename, df in extract_sources(metadata.sources, history_path): - df, duplicated = split_duplicates(df, str(filename), duplicated) - src_df.append(df) - - df = metadata.transformation(src_df, **metadata.extra_kwrds) - - files = write_split_by(df, "Année", staging_path, writing_func) - wrote_files += files - return wrote_files - - -def extract_cat(cat): +def extract_cat(cat: pd.DataFrame): cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"]) cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"] @@ -89,7 +22,9 @@ def extract_cat(cat): return trans, cat_drop -def trans_2017_2021(dfs, **kwrds): +def trans_2017_2021( + dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds +) -> pd.DataFrame: df, cat = dfs cat_trans, cat_drop = extract_cat(cat) @@ -107,10 +42,12 @@ def trans_2017_2021(dfs, **kwrds): Fournisseur="", ) - return df + return df[stagging_columns] -def trans_2022_charge(dfs, **kwrds): +def trans_2022_charge( + dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds +) -> pd.DataFrame: df = dfs[0] df = df.assign( Immeuble=df["immeuble"], @@ -126,10 +63,12 @@ def trans_2022_charge(dfs, **kwrds): Régie="Oralia - Gelas", Libellé="", ) - return df + return df[stagging_columns] -def trans_2022_loc(dfs, **kwrds): +def trans_2022_loc( + dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds +) -> pd.DataFrame: df = dfs[0] df = df.assign( Immeuble=df["immeuble"], @@ -144,10 +83,12 @@ def trans_2022_loc(dfs, **kwrds): Régie="Oralia - Gelas", Libellé="", ) - return df + return df[stagging_columns] -def trans_2023(dfs, year, **kwrds): +def trans_2023( + dfs: list[pd.DataFrame], year: str, stagging_columns: list[str], **kwrds +) -> pd.DataFrame: df = dfs[0] df = df.assign( Débit=df["Débit"].fillna(0), @@ -155,43 +96,60 @@ def trans_2023(dfs, year, **kwrds): Lot=df["Immeuble"].astype(str) + df["Porte"].astype("str").str.zfill(2), Année=year, ) - return df + return df[stagging_columns] -METADATAS = { +STAGGING_COLUMNS = [ + "Régie", + "Immeuble", + "Porte", + "Lot", + "Année", + "Mois", + "Catégorie", + "Fournisseur", + "Libellé", + "Débit", + "Crédit", +] + +FLUXES = { "2017 2021 - charge et locataire.xlsx": Flux( sources=[ - Source( + ExcelSource( filename="2017 2021 - charge et locataire.xlsx", sheet_name="DB CRG" ), - Source( + ExcelSource( filename="2017 2021 - charge et locataire.xlsx", sheet_name="Catégories", ), ], transformation=trans_2017_2021, + extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, ), "2022 - charge.xlsx": Flux( sources=[ - Source(filename="2022 - charge.xlsx", sheet_name="Sheet1"), + ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"), ], transformation=trans_2022_charge, + extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, ), "2022 - locataire.xlsx": Flux( sources=[ - Source(filename="2022 - locataire.xlsx", sheet_name="Sheet1"), + ExcelSource(filename="2022 - locataire.xlsx", sheet_name="Sheet1"), ], transformation=trans_2022_loc, + extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, ), "2023 - charge et locataire.xlsx": Flux( sources=[ - Source( + ExcelSource( filename="2023 - charge et locataire.xlsx", sheet_name="DB CRG 2023 ...", ), ], transformation=trans_2023, - extra_kwrds={"year": 2023}, + extra_kwrds={"year": 2023, "stagging_columns": STAGGING_COLUMNS}, ), } @@ -208,5 +166,5 @@ if __name__ == "__main__": staging_crg_path = staging_path / "CRG" assert staging_crg_path.exists() - crg_files = crg(history_crg_path, staging_crg_path, METADATAS) + crg_files = consume_fluxes(FLUXES, history_crg_path, staging_crg_path) print(crg_files) diff --git a/scripts/stagging_gold.py b/scripts/stagging_gold.py new file mode 100644 index 0000000..2916e1e --- /dev/null +++ b/scripts/stagging_gold.py @@ -0,0 +1,69 @@ +import logging +from collections.abc import Callable +from pathlib import Path + +import pandas as pd + +from scripts.flux import CSVSource, Flux, consume_fluxes + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +def feature_crg(dfs: list[pd.DataFrame]) -> pd.DataFrame: + df = dfs[0] + df = df.assign( + Impact=df["Crédit"] - df["Débit"], + ) + return df + + +GOLD_COLUMNS = [ + "Régie", + "Immeuble", + "Porte", + "Lot", + "Année", + "Mois", + "Catégorie", + "Fournisseur", + "Libellé", + "Débit", + "Crédit", + "Impact", +] + + +def build_crg_fluxes( + crg_path: Path, pattern: str, transformation: Callable, csv_options: dict = {} +) -> dict[str, Flux]: + fluxes = {} + for crg in crg_path.glob(pattern): + fluxes[f"CRG - {crg}"] = Flux( + sources=[CSVSource(filename=crg.name, options=csv_options)], + transformation=transformation, + ) + return fluxes + + +if __name__ == "__main__": + data_path = Path("datas/") + assert data_path.exists() + + staging_path = data_path / "staging" + assert staging_path.exists() + staging_crg_path = staging_path / "CRG" + assert staging_crg_path.exists() + + gold_path = data_path / "gold" + assert gold_path.exists() + gold_crg_path = gold_path / "CRG" + assert gold_crg_path.exists() + + fluxes = build_crg_fluxes( + crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg + ) + crg_files = consume_fluxes( + fluxes=fluxes, origin_path=staging_crg_path, dest_path=gold_crg_path + ) + print(crg_files)