import logging from collections.abc import Callable from pathlib import Path import pandas as pd from scripts.flux import ( CSVSource, Destination, Flux, SplitDestination, Transformation, consume_fluxes, ) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame: df = pd.concat(dfs) df = df.assign( Impact=df["Crédit"] - df["Débit"], ) return df def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame: df = pd.concat(dfs) df = df[df["Année"] == year] pt = df.groupby(["Catégorie", "Mois"]).agg("sum").unstack().fillna(0) pt.columns = [c[1] for c in pt.columns] pt.reset_index(["Catégorie"]) return pt def build_pnl_flux(year: int) -> Flux: return Flux( sources=[ CSVSource(filename=f"CRG/crg-{year}.csv"), CSVSource(filename=f"banque/banque-{year}.csv"), ], transformation=Transformation( function=build_pnl, extra_kwrds={"year": year}, ), destination=Destination(name=f"pnl/{year}"), ) if __name__ == "__main__": data_path = Path("datas/") assert data_path.exists() gold_path = data_path / "gold" assert gold_path.exists() gold_crg_path = gold_path / "CRG" assert gold_crg_path.exists() mart_path = data_path / "datamart" assert mart_path.exists() lot_fluxes = { "Lots": Flux( sources=[CSVSource(filename="CRG/crg-*.csv")], transformation=Transformation(function=build_lots), destination=SplitDestination(name="Lot/lot", split_column="Lot"), ), } years = list(range(2017, 2024)) # pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years} pnl_fluxes = {} files = consume_fluxes( fluxes={**lot_fluxes, **pnl_fluxes}, origin_path=gold_path, dest_path=mart_path ) print(files)