import logging from abc import abstractmethod from collections.abc import Callable from pathlib import Path import pandas as pd from pydantic import BaseModel logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class Source(BaseModel): filename: str @abstractmethod def get_df(self) -> pd.DataFrame: raise NotImplementedError class ExcelSource(Source): sheet_name: str def get_df(self, base_path: Path) -> pd.DataFrame: filepath = base_path / self.filename logger.debug(f"Get content of {filepath}") return pd.read_excel(filepath, sheet_name=self.sheet_name) class CSVSource(Source): options: dict def get_df(self, base_path: Path) -> pd.DataFrame: filepath = base_path / self.filename logger.debug(f"Get content of {filepath}") return pd.read_csv(filepath, **self.options) class Destination(BaseModel): name: str class Flux(BaseModel): sources: list[Source] transformation: Callable extra_kwrds: dict = {} destination: Destination split_destination: str = "" def to_csv(df, dest_basename: Path) -> Path: dest = dest_basename.parent / (dest_basename.stem + ".csv") if dest.exists(): df.to_csv(dest, mode="a", header=False, index=False) else: df.to_csv(dest, index=False) return dest def write_split_by( df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func ) -> list[Path]: wrote_files = [] for col_value in df[column].unique(): filtered_df = df[df[column] == col_value] dest_basename = dest_path / f"{name}-{col_value}" dest = writing_func(filtered_df, dest_basename) wrote_files.append(dest) return wrote_files def extract_sources(sources: list[Source], base_path: Path = Path()): for src in sources: if "*" in src.filename: expanded_src = [ src.model_copy(update={"filename": p}) for p in Path.glob(src.filename) ] yield from extract_sources(expanded_src, base_path) else: filepath = base_path / src.filename assert filepath.exists yield src.filename, src.get_df(base_path) def split_duplicates( df, origin: str, duplicated: dict[str, pd.DataFrame] ) -> [pd.DataFrame, dict[str, pd.DataFrame]]: duplicates = df.duplicated() no_duplicates = df[~duplicates] duplicated[origin] = df[duplicates] return no_duplicates, duplicated def consume_fluxes( fluxes: dict[str, Flux], origin_path: Path, dest_path: Path, writing_func=to_csv, ): duplicated = {} wrote_files = [] for name, flux in fluxes.items(): print(name) logger.info(f"Processing flux {name}") src_df = [] for filename, df in extract_sources(flux.sources, origin_path): df, duplicated = split_duplicates(df, str(filename), duplicated) src_df.append(df) df = flux.transformation(src_df, **flux.extra_kwrds) if flux.split_destination: files = write_split_by( df=df, column=flux.split_destination, dest_path=dest_path, name=flux.destination.name, writing_func=writing_func, ) else: dest_basename = dest_path / flux.destination.name files = [writing_func(df, dest_basename)] wrote_files += files return wrote_files