Compare commits

..

No commits in common. "215e26b84fcf270d2c47c121cdb41ba84f0ba232" and "a1578f813bdef3de66123ba50f79d9f4752b600e" have entirely different histories.

4 changed files with 32 additions and 74 deletions

View File

@ -79,11 +79,7 @@ def feature():
def datamart(): def datamart():
from .gold_mart import FLUXES_LOT from .gold_mart import FLUXES_LOT
consume_fluxes( consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
fluxes=FLUXES_LOT,
origin_path=GOLD_PATH,
dest_path=MART_PATH,
)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -4,7 +4,7 @@ from collections.abc import Callable
from pathlib import Path from pathlib import Path
import pandas as pd import pandas as pd
from pydantic import BaseModel, Field from pydantic import BaseModel
class Source(BaseModel): class Source(BaseModel):
@ -38,51 +38,21 @@ class Transformation(BaseModel):
extra_kwrds: dict = {} extra_kwrds: dict = {}
def to_csv(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".csv")
if dest.exists():
df.to_csv(dest, mode="a", header=False, index=False)
else:
df.to_csv(dest, index=False)
return dest
def to_excel(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
if dest.exists():
raise ValueError(f"The destination exits {dest}")
else:
df.to_excel(dest)
return dest
class Destination(BaseModel): class Destination(BaseModel):
name: str name: str
writer: Callable = Field(to_csv)
def _write(
self,
df: pd.DataFrame,
dest_basename: Path,
writing_func: Callable | None = None,
) -> Path:
if writing_func is None:
writing_func = self.writer
return writing_func(df, dest_basename)
def write( def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
) -> list[Path]: ) -> list[Path]:
dest_basename = dest_path / self.name dest_basename = dest_path / self.name
return [self._write(df, dest_basename, writing_func)] return [writing_func(df, dest_basename)]
class SplitDestination(Destination): class SplitDestination(Destination):
split_column: str split_column: str
def write( def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
) -> list[Path]: ) -> list[Path]:
wrote_files = [] wrote_files = []
@ -90,7 +60,7 @@ class SplitDestination(Destination):
filtered_df = df[df[self.split_column] == col_value] filtered_df = df[df[self.split_column] == col_value]
dest_basename = dest_path / f"{self.name}-{col_value}" dest_basename = dest_path / f"{self.name}-{col_value}"
dest = self._write(filtered_df, dest_basename, writing_func) dest = writing_func(filtered_df, dest_basename)
wrote_files.append(dest) wrote_files.append(dest)
return wrote_files return wrote_files
@ -102,6 +72,15 @@ class Flux(BaseModel):
destination: Destination destination: Destination
def to_csv(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".csv")
if dest.exists():
df.to_csv(dest, mode="a", header=False, index=False)
else:
df.to_csv(dest, index=False)
return dest
def write_split_by( def write_split_by(
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
) -> list[Path]: ) -> list[Path]:
@ -140,38 +119,26 @@ def split_duplicates(
return no_duplicates, duplicated return no_duplicates, duplicated
def consume_flux(
name: str,
flux: Flux,
origin_path: Path,
dest_path: Path,
duplicated={},
):
logging.info(f"Consume {name}")
src_df = []
for filename, df in extract_sources(flux.sources, origin_path):
logging.info(f"Extracting {filename}")
df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df)
logging.info(f"Execute {flux.transformation.function.__name__}")
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
files = flux.destination.write(df, dest_path)
logging.info(f"{files} written")
return files
def consume_fluxes( def consume_fluxes(
fluxes: dict[str, Flux], fluxes: dict[str, Flux],
origin_path: Path, origin_path: Path,
dest_path: Path, dest_path: Path,
writing_func=to_csv,
): ):
duplicated = {} duplicated = {}
wrote_files = [] wrote_files = []
for name, flux in fluxes.items(): for name, flux in fluxes.items():
files = consume_flux(name, flux, origin_path, dest_path, duplicated) logging.info(f"Consume {name}")
src_df = []
for filename, df in extract_sources(flux.sources, origin_path):
logging.info(f"Extracting {filename}")
df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df)
logging.info(f"Execute {flux.transformation.function.__name__}")
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
files = flux.destination.write(df, dest_path, writing_func)
logging.info(f"{files} written")
wrote_files += files wrote_files += files
return wrote_files return wrote_files

View File

@ -11,7 +11,6 @@ from scripts.flux import (
SplitDestination, SplitDestination,
Transformation, Transformation,
consume_fluxes, consume_fluxes,
to_excel,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -27,9 +26,7 @@ FLUXES_LOT = {
"Lots": Flux( "Lots": Flux(
sources=[CSVSource(filename="CRG/crg-*.csv")], sources=[CSVSource(filename="CRG/crg-*.csv")],
transformation=Transformation(function=build_lots), transformation=Transformation(function=build_lots),
destination=SplitDestination( destination=SplitDestination(name="Lot/lot", split_column="Lot"),
name="Lot/lot", split_column="Lot", writer=to_excel
),
), ),
} }
@ -78,8 +75,6 @@ if __name__ == "__main__":
pnl_fluxes = {} pnl_fluxes = {}
files = consume_fluxes( files = consume_fluxes(
fluxes=pnl_fluxes, fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
origin_path=gold_path,
dest_path=mart_path,
) )
print(files) print(files)

View File

@ -12,9 +12,9 @@ logger.setLevel(logging.DEBUG)
def extract_cat(cat: pd.DataFrame): def extract_cat(cat: pd.DataFrame):
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]) cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"] cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
trans = {} trans = {}
for _, (old, new) in cat_trans.iterrows(): for _, (old, new) in cat_trans.iterrows():
trans[old] = new trans[old] = new
@ -140,7 +140,7 @@ FLUXES_CRG = {
), ),
"2022 - charge.xlsx": Flux( "2022 - charge.xlsx": Flux(
sources=[ sources=[
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"), ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
], ],
transformation=Transformation( transformation=Transformation(
function=trans_2022_charge, function=trans_2022_charge,