From b60fa3be17ca5c7606adcfcd48b80725e00ad50d Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Mon, 15 Apr 2024 11:59:32 +0200 Subject: [PATCH] Feat: add excel export for mart --- scripts/__main__.py | 6 +++- scripts/flux.py | 85 ++++++++++++++++++++++++++++++-------------- scripts/gold_mart.py | 9 +++-- 3 files changed, 71 insertions(+), 29 deletions(-) diff --git a/scripts/__main__.py b/scripts/__main__.py index a79f7ea..a6845b8 100644 --- a/scripts/__main__.py +++ b/scripts/__main__.py @@ -79,7 +79,11 @@ def feature(): def datamart(): from .gold_mart import FLUXES_LOT - consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH) + consume_fluxes( + fluxes=FLUXES_LOT, + origin_path=GOLD_PATH, + dest_path=MART_PATH, + ) if __name__ == "__main__": diff --git a/scripts/flux.py b/scripts/flux.py index d52e8fc..1c5b772 100644 --- a/scripts/flux.py +++ b/scripts/flux.py @@ -4,7 +4,7 @@ from collections.abc import Callable from pathlib import Path import pandas as pd -from pydantic import BaseModel +from pydantic import BaseModel, Field class Source(BaseModel): @@ -38,21 +38,51 @@ class Transformation(BaseModel): extra_kwrds: dict = {} +def to_csv(df, dest_basename: Path) -> Path: + dest = dest_basename.parent / (dest_basename.stem + ".csv") + if dest.exists(): + df.to_csv(dest, mode="a", header=False, index=False) + else: + df.to_csv(dest, index=False) + return dest + + +def to_excel(df, dest_basename: Path) -> Path: + dest = dest_basename.parent / (dest_basename.stem + ".xlsx") + if dest.exists(): + raise ValueError(f"The destination exits {dest}") + else: + df.to_excel(dest) + return dest + + class Destination(BaseModel): name: str + writer: Callable = Field(to_csv) + + def _write( + self, + df: pd.DataFrame, + dest_basename: Path, + writing_func: Callable | None = None, + ) -> Path: + if writing_func is None: + writing_func = self.writer + + return writing_func(df, dest_basename) def write( - self, df: pd.DataFrame, dest_path: Path, writing_func: Callable + self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None ) -> list[Path]: dest_basename = dest_path / self.name - return [writing_func(df, dest_basename)] + return [self._write(df, dest_basename, writing_func)] class SplitDestination(Destination): split_column: str def write( - self, df: pd.DataFrame, dest_path: Path, writing_func: Callable + self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None ) -> list[Path]: wrote_files = [] @@ -60,7 +90,7 @@ class SplitDestination(Destination): filtered_df = df[df[self.split_column] == col_value] dest_basename = dest_path / f"{self.name}-{col_value}" - dest = writing_func(filtered_df, dest_basename) + dest = self._write(filtered_df, dest_basename, writing_func) wrote_files.append(dest) return wrote_files @@ -72,15 +102,6 @@ class Flux(BaseModel): destination: Destination -def to_csv(df, dest_basename: Path) -> Path: - dest = dest_basename.parent / (dest_basename.stem + ".csv") - if dest.exists(): - df.to_csv(dest, mode="a", header=False, index=False) - else: - df.to_csv(dest, index=False) - return dest - - def write_split_by( df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func ) -> list[Path]: @@ -119,26 +140,38 @@ def split_duplicates( return no_duplicates, duplicated +def consume_flux( + name: str, + flux: Flux, + origin_path: Path, + dest_path: Path, + duplicated={}, +): + logging.info(f"Consume {name}") + src_df = [] + for filename, df in extract_sources(flux.sources, origin_path): + logging.info(f"Extracting {filename}") + df, duplicated = split_duplicates(df, str(filename), duplicated) + src_df.append(df) + + logging.info(f"Execute {flux.transformation.function.__name__}") + df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds) + + files = flux.destination.write(df, dest_path) + + logging.info(f"{files} written") + return files + + def consume_fluxes( fluxes: dict[str, Flux], origin_path: Path, dest_path: Path, - writing_func=to_csv, ): duplicated = {} wrote_files = [] for name, flux in fluxes.items(): - logging.info(f"Consume {name}") - src_df = [] - for filename, df in extract_sources(flux.sources, origin_path): - logging.info(f"Extracting {filename}") - df, duplicated = split_duplicates(df, str(filename), duplicated) - src_df.append(df) - - logging.info(f"Execute {flux.transformation.function.__name__}") - df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds) - files = flux.destination.write(df, dest_path, writing_func) - logging.info(f"{files} written") + files = consume_flux(name, flux, origin_path, dest_path, duplicated) wrote_files += files return wrote_files diff --git a/scripts/gold_mart.py b/scripts/gold_mart.py index 3e4084d..d791684 100644 --- a/scripts/gold_mart.py +++ b/scripts/gold_mart.py @@ -11,6 +11,7 @@ from scripts.flux import ( SplitDestination, Transformation, consume_fluxes, + to_excel, ) logger = logging.getLogger(__name__) @@ -26,7 +27,9 @@ FLUXES_LOT = { "Lots": Flux( sources=[CSVSource(filename="CRG/crg-*.csv")], transformation=Transformation(function=build_lots), - destination=SplitDestination(name="Lot/lot", split_column="Lot"), + destination=SplitDestination( + name="Lot/lot", split_column="Lot", writer=to_excel + ), ), } @@ -75,6 +78,8 @@ if __name__ == "__main__": pnl_fluxes = {} files = consume_fluxes( - fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path + fluxes=pnl_fluxes, + origin_path=gold_path, + dest_path=mart_path, ) print(files)