import logging from collections.abc import Callable from pathlib import Path import pandas as pd from scripts.flux import CSVSource, Destination, Flux, Transformation, consume_fluxes logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) def feature_crg(dfs: list[pd.DataFrame]) -> pd.DataFrame: df = dfs[0] df = df.assign( Impact=df["Crédit"] - df["Débit"], ) return df GOLD_COLUMNS = [ "Régie", "Immeuble", "Porte", "Lot", "Année", "Mois", "Catégorie", "Fournisseur", "Libellé", "Débit", "Crédit", "Impact", ] def build_crg_fluxes( crg_path: Path, pattern: str, transformation: Callable, csv_options: dict = {} ) -> dict[str, Flux]: fluxes = {} for file in crg_path.glob(pattern): fluxes[f"CRG - {file.name}"] = Flux( sources=[CSVSource(filename=file.name, options=csv_options)], transformation=Transformation(function=transformation), destination=Destination(name=file.name), ) return fluxes if __name__ == "__main__": data_path = Path("datas/") assert data_path.exists() staging_path = data_path / "staging" assert staging_path.exists() staging_crg_path = staging_path / "CRG" assert staging_crg_path.exists() gold_path = data_path / "gold" assert gold_path.exists() gold_crg_path = gold_path / "CRG" assert gold_crg_path.exists() fluxes = build_crg_fluxes( crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg ) crg_files = consume_fluxes( fluxes=fluxes, origin_path=staging_crg_path, dest_path=gold_crg_path ) print(crg_files)