diff --git a/scripts/flux.py b/scripts/flux.py index e2e5e58..5643623 100644 --- a/scripts/flux.py +++ b/scripts/flux.py @@ -36,16 +36,43 @@ class CSVSource(Source): return pd.read_csv(filepath, **self.options) +class Transformation(BaseModel): + function: Callable + extra_kwrds: dict = {} + + class Destination(BaseModel): name: str + def write( + self, df: pd.DataFrame, dest_path: Path, writing_func: Callable + ) -> list[Path]: + dest_basename = dest_path / self.name + return [writing_func(df, dest_basename)] + + +class SplitDestination(Destination): + split_column: str + + def write( + self, df: pd.DataFrame, dest_path: Path, writing_func: Callable + ) -> list[Path]: + wrote_files = [] + + for col_value in df[self.split_column].unique(): + filtered_df = df[df[self.split_column] == col_value] + + dest_basename = dest_path / f"{self.name}-{col_value}" + dest = writing_func(filtered_df, dest_basename) + wrote_files.append(dest) + + return wrote_files + class Flux(BaseModel): sources: list[Source] - transformation: Callable - extra_kwrds: dict = {} + transformation: Transformation destination: Destination - split_destination: str = "" def to_csv(df, dest_basename: Path) -> Path: @@ -112,18 +139,8 @@ def consume_fluxes( df, duplicated = split_duplicates(df, str(filename), duplicated) src_df.append(df) - df = flux.transformation(src_df, **flux.extra_kwrds) + df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds) - if flux.split_destination: - files = write_split_by( - df=df, - column=flux.split_destination, - dest_path=dest_path, - name=flux.destination.name, - writing_func=writing_func, - ) - else: - dest_basename = dest_path / flux.destination.name - files = [writing_func(df, dest_basename)] + files = flux.destination.write(df, dest_path, writing_func) wrote_files += files return wrote_files diff --git a/scripts/gold_mart.py b/scripts/gold_mart.py index 37d6007..450b789 100644 --- a/scripts/gold_mart.py +++ b/scripts/gold_mart.py @@ -4,7 +4,14 @@ from pathlib import Path import pandas as pd -from scripts.flux import CSVSource, Destination, Flux, consume_fluxes +from scripts.flux import ( + CSVSource, + Destination, + Flux, + SplitDestination, + Transformation, + consume_fluxes, +) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -54,9 +61,8 @@ if __name__ == "__main__": lot_fluxes = { "Lots": Flux( sources=[CSVSource(filename="CRG/crg-*.csv")], - transformation=build_lots, - destination=Destination(name="Lot/lot"), - split_destination="Lot", + transformation=Transformation(function=build_lots), + destination=SplitDestination(name="Lot/lot", split_column="Lot"), ), } years = list(range(2017, 2024)) diff --git a/scripts/history_stagging.py b/scripts/history_stagging.py index 408a6a7..a3d7645 100644 --- a/scripts/history_stagging.py +++ b/scripts/history_stagging.py @@ -5,7 +5,7 @@ import pandas as pd from scripts.flux import consume_fluxes -from .flux import Destination, ExcelSource, Flux +from .flux import Destination, ExcelSource, Flux, SplitDestination, Transformation logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -124,28 +124,31 @@ FLUXES = { sheet_name="Catégories", ), ], - transformation=trans_2017_2021, - extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, - destination=Destination(name="crg"), - split_destination="Année", + transformation=Transformation( + function=trans_2017_2021, + extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, + ), + destination=SplitDestination(name="crg", split_column="Année"), ), "2022 - charge.xlsx": Flux( sources=[ ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"), ], - transformation=trans_2022_charge, - extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, - destination=Destination(name="crg"), - split_destination="Année", + transformation=Transformation( + function=trans_2022_charge, + extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, + ), + destination=SplitDestination(name="crg", split_column="Année"), ), "2022 - locataire.xlsx": Flux( sources=[ ExcelSource(filename="2022 - locataire.xlsx", sheet_name="Sheet1"), ], - transformation=trans_2022_loc, - extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, - destination=Destination(name="crg"), - split_destination="Année", + transformation=Transformation( + function=trans_2022_loc, + extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, + ), + destination=SplitDestination(name="crg", split_column="Année"), ), "2023 - charge et locataire.xlsx": Flux( sources=[ @@ -154,10 +157,11 @@ FLUXES = { sheet_name="DB CRG 2023 ...", ), ], - transformation=trans_2023, - extra_kwrds={"year": 2023, "stagging_columns": STAGGING_COLUMNS}, - destination=Destination(name="crg"), - split_destination="Année", + transformation=Transformation( + function=trans_2023, + extra_kwrds={"year": 2023, "stagging_columns": STAGGING_COLUMNS}, + ), + destination=SplitDestination(name="crg", split_column="Année"), ), } diff --git a/scripts/stagging_gold.py b/scripts/stagging_gold.py index 937298c..0886d04 100644 --- a/scripts/stagging_gold.py +++ b/scripts/stagging_gold.py @@ -4,7 +4,7 @@ from pathlib import Path import pandas as pd -from scripts.flux import CSVSource, Destination, Flux, consume_fluxes +from scripts.flux import CSVSource, Destination, Flux, Transformation, consume_fluxes logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -41,7 +41,7 @@ def build_crg_fluxes( for file in crg_path.glob(pattern): fluxes[f"CRG - {file.name}"] = Flux( sources=[CSVSource(filename=file.name, options=csv_options)], - transformation=transformation, + transformation=Transformation(function=transformation), destination=Destination(name=file.name), ) return fluxes