Compare commits

..

No commits in common. "215e26b84fcf270d2c47c121cdb41ba84f0ba232" and "a1578f813bdef3de66123ba50f79d9f4752b600e" have entirely different histories.

4 changed files with 32 additions and 74 deletions

View File

@ -79,11 +79,7 @@ def feature():
def datamart():
from .gold_mart import FLUXES_LOT
consume_fluxes(
fluxes=FLUXES_LOT,
origin_path=GOLD_PATH,
dest_path=MART_PATH,
)
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
if __name__ == "__main__":

View File

@ -4,7 +4,7 @@ from collections.abc import Callable
from pathlib import Path
import pandas as pd
from pydantic import BaseModel, Field
from pydantic import BaseModel
class Source(BaseModel):
@ -38,51 +38,21 @@ class Transformation(BaseModel):
extra_kwrds: dict = {}
def to_csv(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".csv")
if dest.exists():
df.to_csv(dest, mode="a", header=False, index=False)
else:
df.to_csv(dest, index=False)
return dest
def to_excel(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
if dest.exists():
raise ValueError(f"The destination exits {dest}")
else:
df.to_excel(dest)
return dest
class Destination(BaseModel):
name: str
writer: Callable = Field(to_csv)
def _write(
self,
df: pd.DataFrame,
dest_basename: Path,
writing_func: Callable | None = None,
) -> Path:
if writing_func is None:
writing_func = self.writer
return writing_func(df, dest_basename)
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
) -> list[Path]:
dest_basename = dest_path / self.name
return [self._write(df, dest_basename, writing_func)]
return [writing_func(df, dest_basename)]
class SplitDestination(Destination):
split_column: str
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
) -> list[Path]:
wrote_files = []
@ -90,7 +60,7 @@ class SplitDestination(Destination):
filtered_df = df[df[self.split_column] == col_value]
dest_basename = dest_path / f"{self.name}-{col_value}"
dest = self._write(filtered_df, dest_basename, writing_func)
dest = writing_func(filtered_df, dest_basename)
wrote_files.append(dest)
return wrote_files
@ -102,6 +72,15 @@ class Flux(BaseModel):
destination: Destination
def to_csv(df, dest_basename: Path) -> Path:
dest = dest_basename.parent / (dest_basename.stem + ".csv")
if dest.exists():
df.to_csv(dest, mode="a", header=False, index=False)
else:
df.to_csv(dest, index=False)
return dest
def write_split_by(
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
) -> list[Path]:
@ -140,13 +119,16 @@ def split_duplicates(
return no_duplicates, duplicated
def consume_flux(
name: str,
flux: Flux,
def consume_fluxes(
fluxes: dict[str, Flux],
origin_path: Path,
dest_path: Path,
duplicated={},
writing_func=to_csv,
):
duplicated = {}
wrote_files = []
for name, flux in fluxes.items():
logging.info(f"Consume {name}")
src_df = []
for filename, df in extract_sources(flux.sources, origin_path):
@ -156,22 +138,7 @@ def consume_flux(
logging.info(f"Execute {flux.transformation.function.__name__}")
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
files = flux.destination.write(df, dest_path)
files = flux.destination.write(df, dest_path, writing_func)
logging.info(f"{files} written")
return files
def consume_fluxes(
fluxes: dict[str, Flux],
origin_path: Path,
dest_path: Path,
):
duplicated = {}
wrote_files = []
for name, flux in fluxes.items():
files = consume_flux(name, flux, origin_path, dest_path, duplicated)
wrote_files += files
return wrote_files

View File

@ -11,7 +11,6 @@ from scripts.flux import (
SplitDestination,
Transformation,
consume_fluxes,
to_excel,
)
logger = logging.getLogger(__name__)
@ -27,9 +26,7 @@ FLUXES_LOT = {
"Lots": Flux(
sources=[CSVSource(filename="CRG/crg-*.csv")],
transformation=Transformation(function=build_lots),
destination=SplitDestination(
name="Lot/lot", split_column="Lot", writer=to_excel
),
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
),
}
@ -78,8 +75,6 @@ if __name__ == "__main__":
pnl_fluxes = {}
files = consume_fluxes(
fluxes=pnl_fluxes,
origin_path=gold_path,
dest_path=mart_path,
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
)
print(files)

View File

@ -12,9 +12,9 @@ logger.setLevel(logging.DEBUG)
def extract_cat(cat: pd.DataFrame):
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"])
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
trans = {}
for _, (old, new) in cat_trans.iterrows():
trans[old] = new
@ -140,7 +140,7 @@ FLUXES_CRG = {
),
"2022 - charge.xlsx": Flux(
sources=[
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"),
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
],
transformation=Transformation(
function=trans_2022_charge,