From d0961b0909cb10f60ddd4acf28f9eba4e3cca96e Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Sun, 3 Mar 2024 07:05:10 +0100 Subject: [PATCH] Feat: add destination in flux definition --- scripts/flux.py | 31 +++++++++++++++++++++++++------ scripts/history_stagging.py | 16 ++++++++++++++-- scripts/stagging_gold.py | 9 +++++---- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/scripts/flux.py b/scripts/flux.py index 037eac5..368ea27 100644 --- a/scripts/flux.py +++ b/scripts/flux.py @@ -36,14 +36,20 @@ class CSVSource(Source): return pd.read_csv(filepath, **self.options) +class Destination(BaseModel): + name: str + + class Flux(BaseModel): sources: list[Source] transformation: Callable extra_kwrds: dict = {} + destination: Destination + split_destination: str = "" -def to_csv(df, dest_basename): - dest = dest_basename.parent / (dest_basename.name + ".csv") +def to_csv(df, dest_basename: Path) -> Path: + dest = dest_basename.parent / (dest_basename.stem + ".csv") if dest.exists(): df.to_csv(dest, mode="a", header=False, index=False) else: @@ -52,14 +58,14 @@ def to_csv(df, dest_basename): def write_split_by( - df: pd.DataFrame, column: str, dest_path: Path, writing_func + df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func ) -> list[Path]: wrote_files = [] for col_value in df[column].unique(): filtered_df = df[df[column] == col_value] - dest_basename = dest_path / f"{col_value}" + dest_basename = dest_path / f"{name}-{col_value}" dest = writing_func(filtered_df, dest_basename) wrote_files.append(dest) @@ -89,7 +95,10 @@ def split_duplicates( def consume_fluxes( - fluxes: dict[str, Flux], origin_path: Path, dest_path: Path, writing_func=to_csv + fluxes: dict[str, Flux], + origin_path: Path, + dest_path: Path, + writing_func=to_csv, ): duplicated = {} wrote_files = [] @@ -104,6 +113,16 @@ def consume_fluxes( df = flux.transformation(src_df, **flux.extra_kwrds) - files = write_split_by(df, "Année", dest_path, writing_func) + if flux.split_destination: + files = write_split_by( + df=df, + column=flux.split_destination, + dest_path=dest_path, + name=flux.destination.name, + writing_func=writing_func, + ) + else: + dest_basename = dest_path / flux.destination.name + files = [writing_func(df, dest_basename)] wrote_files += files return wrote_files diff --git a/scripts/history_stagging.py b/scripts/history_stagging.py index 4dc8287..408a6a7 100644 --- a/scripts/history_stagging.py +++ b/scripts/history_stagging.py @@ -5,7 +5,7 @@ import pandas as pd from scripts.flux import consume_fluxes -from .flux import ExcelSource, Flux +from .flux import Destination, ExcelSource, Flux logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -126,6 +126,8 @@ FLUXES = { ], transformation=trans_2017_2021, extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, + destination=Destination(name="crg"), + split_destination="Année", ), "2022 - charge.xlsx": Flux( sources=[ @@ -133,6 +135,8 @@ FLUXES = { ], transformation=trans_2022_charge, extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, + destination=Destination(name="crg"), + split_destination="Année", ), "2022 - locataire.xlsx": Flux( sources=[ @@ -140,6 +144,8 @@ FLUXES = { ], transformation=trans_2022_loc, extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, + destination=Destination(name="crg"), + split_destination="Année", ), "2023 - charge et locataire.xlsx": Flux( sources=[ @@ -150,6 +156,8 @@ FLUXES = { ], transformation=trans_2023, extra_kwrds={"year": 2023, "stagging_columns": STAGGING_COLUMNS}, + destination=Destination(name="crg"), + split_destination="Année", ), } @@ -166,5 +174,9 @@ if __name__ == "__main__": staging_crg_path = staging_path / "CRG" assert staging_crg_path.exists() - crg_files = consume_fluxes(FLUXES, history_crg_path, staging_crg_path) + crg_files = consume_fluxes( + fluxes=FLUXES, + origin_path=history_crg_path, + dest_path=staging_crg_path, + ) print(crg_files) diff --git a/scripts/stagging_gold.py b/scripts/stagging_gold.py index 2916e1e..937298c 100644 --- a/scripts/stagging_gold.py +++ b/scripts/stagging_gold.py @@ -4,7 +4,7 @@ from pathlib import Path import pandas as pd -from scripts.flux import CSVSource, Flux, consume_fluxes +from scripts.flux import CSVSource, Destination, Flux, consume_fluxes logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -38,10 +38,11 @@ def build_crg_fluxes( crg_path: Path, pattern: str, transformation: Callable, csv_options: dict = {} ) -> dict[str, Flux]: fluxes = {} - for crg in crg_path.glob(pattern): - fluxes[f"CRG - {crg}"] = Flux( - sources=[CSVSource(filename=crg.name, options=csv_options)], + for file in crg_path.glob(pattern): + fluxes[f"CRG - {file.name}"] = Flux( + sources=[CSVSource(filename=file.name, options=csv_options)], transformation=transformation, + destination=Destination(name=file.name), ) return fluxes