diff --git a/scripts/__main__.py b/scripts/__main__.py index dcfa96c..a79f7ea 100644 --- a/scripts/__main__.py +++ b/scripts/__main__.py @@ -1,5 +1,22 @@ +import logging +from logging.config import dictConfig +from pathlib import Path + import click +from .flux import consume_fluxes + +DATA_PATH = Path("datas/") +assert DATA_PATH.exists() +HISTORY_PATH = DATA_PATH / "Histoire" +assert HISTORY_PATH.exists() +STAGING_PATH = DATA_PATH / "staging" +assert STAGING_PATH.exists() +GOLD_PATH = DATA_PATH / "gold" +assert GOLD_PATH.exists() +MART_PATH = DATA_PATH / "datamart" +assert MART_PATH.exists() + @click.group() @click.option("--debug/--no-debug", default=False) @@ -29,14 +46,41 @@ def main(debug): @main.command() def ingest(): - pass + from .history_stagging import FLUXES_CRG + + history_crg_path = HISTORY_PATH / "CRG" + assert history_crg_path.exists() + staging_crg_path = STAGING_PATH / "CRG" + assert staging_crg_path.exists() + consume_fluxes( + fluxes=FLUXES_CRG, + origin_path=history_crg_path, + dest_path=staging_crg_path, + ) @main.command() def feature(): - pass + from .stagging_gold import FLUXES_CRG + + staging_crg_path = STAGING_PATH / "CRG" + assert staging_crg_path.exists() + gold_crg_path = GOLD_PATH / "CRG" + assert gold_crg_path.exists() + + consume_fluxes( + fluxes=FLUXES_CRG(staging_crg_path), + origin_path=staging_crg_path, + dest_path=gold_crg_path, + ) @main.command() def datamart(): - pass + from .gold_mart import FLUXES_LOT + + consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH) + + +if __name__ == "__main__": + main() diff --git a/scripts/flux.py b/scripts/flux.py index b20fa61..d52e8fc 100644 --- a/scripts/flux.py +++ b/scripts/flux.py @@ -6,9 +6,6 @@ from pathlib import Path import pandas as pd from pydantic import BaseModel -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) - class Source(BaseModel): filename: str @@ -23,7 +20,7 @@ class ExcelSource(Source): def get_df(self, base_path: Path) -> pd.DataFrame: filepath = base_path / self.filename - logger.debug(f"Get content of {filepath}") + logging.debug(f"Get content of {filepath}") return pd.read_excel(filepath, sheet_name=self.sheet_name) @@ -32,7 +29,7 @@ class CSVSource(Source): def get_df(self, base_path: Path) -> pd.DataFrame: filepath = base_path / self.filename - logger.debug(f"Get content of {filepath}") + logging.debug(f"Get content of {filepath}") return pd.read_csv(filepath, **self.options) @@ -132,17 +129,16 @@ def consume_fluxes( wrote_files = [] for name, flux in fluxes.items(): - print(f"Consume {name}") - logger.info(f"Processing flux {name}") + logging.info(f"Consume {name}") src_df = [] for filename, df in extract_sources(flux.sources, origin_path): - print(f"Extracting {filename}") + logging.info(f"Extracting {filename}") df, duplicated = split_duplicates(df, str(filename), duplicated) src_df.append(df) - print(f"Execute {flux.transformation.function.__name__}") + logging.info(f"Execute {flux.transformation.function.__name__}") df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds) files = flux.destination.write(df, dest_path, writing_func) - print(files) + logging.info(f"{files} written") wrote_files += files return wrote_files diff --git a/scripts/gold_mart.py b/scripts/gold_mart.py index cc990ea..50ccec7 100644 --- a/scripts/gold_mart.py +++ b/scripts/gold_mart.py @@ -25,6 +25,15 @@ def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame: return df +FLUXES_LOT = { + "Lots": Flux( + sources=[CSVSource(filename="CRG/crg-*.csv")], + transformation=Transformation(function=build_lots), + destination=SplitDestination(name="Lot/lot", split_column="Lot"), + ), +} + + def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame: df = pd.concat(dfs) df = df[df["Année"] == year] @@ -60,18 +69,15 @@ if __name__ == "__main__": mart_path = data_path / "datamart" assert mart_path.exists() - lot_fluxes = { - "Lots": Flux( - sources=[CSVSource(filename="CRG/crg-*.csv")], - transformation=Transformation(function=build_lots), - destination=SplitDestination(name="Lot/lot", split_column="Lot"), - ), - } + files = consume_fluxes( + fluxes=FLUXES_LOT, origin_path=gold_path, dest_path=mart_path + ) + years = list(range(2017, 2024)) # pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years} pnl_fluxes = {} files = consume_fluxes( - fluxes={**lot_fluxes, **pnl_fluxes}, origin_path=gold_path, dest_path=mart_path + fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path ) print(files) diff --git a/scripts/history_stagging.py b/scripts/history_stagging.py index c1575a1..49bcd2f 100644 --- a/scripts/history_stagging.py +++ b/scripts/history_stagging.py @@ -119,7 +119,7 @@ STAGGING_COLUMNS = [ "Crédit", ] -FLUXES = { +FLUXES_CRG = { "2017 2021 - charge et locataire.xlsx": Flux( sources=[ ExcelSource( @@ -185,7 +185,7 @@ if __name__ == "__main__": assert staging_crg_path.exists() crg_files = consume_fluxes( - fluxes=FLUXES, + fluxes=FLUXES_CRG, origin_path=history_crg_path, dest_path=staging_crg_path, ) diff --git a/scripts/stagging_gold.py b/scripts/stagging_gold.py index d24c0c2..f26b9a1 100644 --- a/scripts/stagging_gold.py +++ b/scripts/stagging_gold.py @@ -48,6 +48,12 @@ def build_crg_fluxes( return fluxes +def FLUXES_CRG(staging_crg_path: Path): + return build_crg_fluxes( + crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg + ) + + if __name__ == "__main__": data_path = Path("datas/") assert data_path.exists() @@ -62,10 +68,9 @@ if __name__ == "__main__": gold_crg_path = gold_path / "CRG" assert gold_crg_path.exists() - fluxes = build_crg_fluxes( - crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg - ) crg_files = consume_fluxes( - fluxes=fluxes, origin_path=staging_crg_path, dest_path=gold_crg_path + fluxes=FLUXES_CRG(staging_crg_path), + origin_path=staging_crg_path, + dest_path=gold_crg_path, ) print(crg_files)