import logging from abc import abstractmethod from collections.abc import Callable from pathlib import Path import pandas as pd from pydantic import BaseModel logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class Source(BaseModel): filename: str @abstractmethod def get_df(self) -> pd.DataFrame: raise NotImplementedError class ExcelSource(Source): sheet_name: str def get_df(self, base_path: Path) -> pd.DataFrame: filepath = base_path / self.filename logger.debug(f"Get content of {filepath}") return pd.read_excel(filepath, sheet_name=self.sheet_name) class CSVSource(Source): options: dict = {} def get_df(self, base_path: Path) -> pd.DataFrame: filepath = base_path / self.filename logger.debug(f"Get content of {filepath}") return pd.read_csv(filepath, **self.options) class Transformation(BaseModel): function: Callable extra_kwrds: dict = {} class Destination(BaseModel): name: str def write( self, df: pd.DataFrame, dest_path: Path, writing_func: Callable ) -> list[Path]: dest_basename = dest_path / self.name return [writing_func(df, dest_basename)] class SplitDestination(Destination): split_column: str def write( self, df: pd.DataFrame, dest_path: Path, writing_func: Callable ) -> list[Path]: wrote_files = [] for col_value in df[self.split_column].unique(): filtered_df = df[df[self.split_column] == col_value] dest_basename = dest_path / f"{self.name}-{col_value}" dest = writing_func(filtered_df, dest_basename) wrote_files.append(dest) return wrote_files class Flux(BaseModel): sources: list[Source] transformation: Transformation destination: Destination def to_csv(df, dest_basename: Path) -> Path: dest = dest_basename.parent / (dest_basename.stem + ".csv") if dest.exists(): df.to_csv(dest, mode="a", header=False, index=False) else: df.to_csv(dest, index=False) return dest def write_split_by( df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func ) -> list[Path]: wrote_files = [] for col_value in df[column].unique(): filtered_df = df[df[column] == col_value] dest_basename = dest_path / f"{name}-{col_value}" dest = writing_func(filtered_df, dest_basename) wrote_files.append(dest) return wrote_files def extract_sources(sources: list[Source], base_path: Path = Path()): for src in sources: if "*" in src.filename: expanded_src = [ src.model_copy(update={"filename": str(p.relative_to(base_path))}) for p in base_path.glob(src.filename) ] yield from extract_sources(expanded_src, base_path) else: filepath = base_path / src.filename assert filepath.exists yield src.filename, src.get_df(base_path) def split_duplicates( df, origin: str, duplicated: dict[str, pd.DataFrame] ) -> [pd.DataFrame, dict[str, pd.DataFrame]]: duplicates = df.duplicated() no_duplicates = df[~duplicates] duplicated[origin] = df[duplicates] return no_duplicates, duplicated def consume_fluxes( fluxes: dict[str, Flux], origin_path: Path, dest_path: Path, writing_func=to_csv, ): duplicated = {} wrote_files = [] for name, flux in fluxes.items(): print(name) logger.info(f"Processing flux {name}") src_df = [] for filename, df in extract_sources(flux.sources, origin_path): df, duplicated = split_duplicates(df, str(filename), duplicated) src_df.append(df) df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds) files = flux.destination.write(df, dest_path, writing_func) wrote_files += files return wrote_files