import logging from abc import abstractmethod from collections.abc import Callable from pathlib import Path import pandas as pd from pydantic import BaseModel logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class Source(BaseModel): filename: str @abstractmethod def get_df(self) -> pd.DataFrame: raise NotImplementedError class ExcelSource(Source): sheet_name: str def get_df(self, base_path: Path) -> pd.DataFrame: filepath = base_path / self.filename logger.debug(f"Get content of {filepath}") return pd.read_excel(filepath, sheet_name=self.sheet_name) class CSVSource(Source): options: dict def get_df(self, base_path: Path) -> pd.DataFrame: filepath = base_path / self.filename logger.debug(f"Get content of {filepath}") return pd.read_csv(filepath, **self.options) class Flux(BaseModel): sources: list[Source] transformation: Callable extra_kwrds: dict = {} def to_csv(df, dest_basename): dest = dest_basename.parent / (dest_basename.name + ".csv") if dest.exists(): df.to_csv(dest, mode="a", header=False, index=False) else: df.to_csv(dest, index=False) return dest def write_split_by( df: pd.DataFrame, column: str, dest_path: Path, writing_func ) -> list[Path]: wrote_files = [] for col_value in df[column].unique(): filtered_df = df[df[column] == col_value] dest_basename = dest_path / f"{col_value}" dest = writing_func(filtered_df, dest_basename) wrote_files.append(dest) return wrote_files def extract_sources(sources: list[Source], base_path: Path = Path()): for src in sources: if "*" in src.filename: expanded_src = [ src.model_copy(update={"filename": p}) for p in Path.glob(src.filename) ] yield from extract_sources(expanded_src, base_path) else: filepath = base_path / src.filename assert filepath.exists yield src.filename, src.get_df(base_path) def split_duplicates( df, origin: str, duplicated: dict[str, pd.DataFrame] ) -> [pd.DataFrame, dict[str, pd.DataFrame]]: duplicates = df.duplicated() no_duplicates = df[~duplicates] duplicated[origin] = df[duplicates] return no_duplicates, duplicated def consume_fluxes( fluxes: dict[str, Flux], origin_path: Path, dest_path: Path, writing_func=to_csv ): duplicated = {} wrote_files = [] for name, flux in fluxes.items(): print(name) logger.info(f"Processing flux {name}") src_df = [] for filename, df in extract_sources(flux.sources, origin_path): df, duplicated = split_duplicates(df, str(filename), duplicated) src_df.append(df) df = flux.transformation(src_df, **flux.extra_kwrds) files = write_split_by(df, "Année", dest_path, writing_func) wrote_files += files return wrote_files