Compare commits
No commits in common. "215e26b84fcf270d2c47c121cdb41ba84f0ba232" and "a1578f813bdef3de66123ba50f79d9f4752b600e" have entirely different histories.
215e26b84f
...
a1578f813b
@ -79,11 +79,7 @@ def feature():
|
|||||||
def datamart():
|
def datamart():
|
||||||
from .gold_mart import FLUXES_LOT
|
from .gold_mart import FLUXES_LOT
|
||||||
|
|
||||||
consume_fluxes(
|
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
|
||||||
fluxes=FLUXES_LOT,
|
|
||||||
origin_path=GOLD_PATH,
|
|
||||||
dest_path=MART_PATH,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -4,7 +4,7 @@ from collections.abc import Callable
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
class Source(BaseModel):
|
class Source(BaseModel):
|
||||||
@ -38,51 +38,21 @@ class Transformation(BaseModel):
|
|||||||
extra_kwrds: dict = {}
|
extra_kwrds: dict = {}
|
||||||
|
|
||||||
|
|
||||||
def to_csv(df, dest_basename: Path) -> Path:
|
|
||||||
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
|
||||||
if dest.exists():
|
|
||||||
df.to_csv(dest, mode="a", header=False, index=False)
|
|
||||||
else:
|
|
||||||
df.to_csv(dest, index=False)
|
|
||||||
return dest
|
|
||||||
|
|
||||||
|
|
||||||
def to_excel(df, dest_basename: Path) -> Path:
|
|
||||||
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
|
|
||||||
if dest.exists():
|
|
||||||
raise ValueError(f"The destination exits {dest}")
|
|
||||||
else:
|
|
||||||
df.to_excel(dest)
|
|
||||||
return dest
|
|
||||||
|
|
||||||
|
|
||||||
class Destination(BaseModel):
|
class Destination(BaseModel):
|
||||||
name: str
|
name: str
|
||||||
writer: Callable = Field(to_csv)
|
|
||||||
|
|
||||||
def _write(
|
|
||||||
self,
|
|
||||||
df: pd.DataFrame,
|
|
||||||
dest_basename: Path,
|
|
||||||
writing_func: Callable | None = None,
|
|
||||||
) -> Path:
|
|
||||||
if writing_func is None:
|
|
||||||
writing_func = self.writer
|
|
||||||
|
|
||||||
return writing_func(df, dest_basename)
|
|
||||||
|
|
||||||
def write(
|
def write(
|
||||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
||||||
) -> list[Path]:
|
) -> list[Path]:
|
||||||
dest_basename = dest_path / self.name
|
dest_basename = dest_path / self.name
|
||||||
return [self._write(df, dest_basename, writing_func)]
|
return [writing_func(df, dest_basename)]
|
||||||
|
|
||||||
|
|
||||||
class SplitDestination(Destination):
|
class SplitDestination(Destination):
|
||||||
split_column: str
|
split_column: str
|
||||||
|
|
||||||
def write(
|
def write(
|
||||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
||||||
) -> list[Path]:
|
) -> list[Path]:
|
||||||
wrote_files = []
|
wrote_files = []
|
||||||
|
|
||||||
@ -90,7 +60,7 @@ class SplitDestination(Destination):
|
|||||||
filtered_df = df[df[self.split_column] == col_value]
|
filtered_df = df[df[self.split_column] == col_value]
|
||||||
|
|
||||||
dest_basename = dest_path / f"{self.name}-{col_value}"
|
dest_basename = dest_path / f"{self.name}-{col_value}"
|
||||||
dest = self._write(filtered_df, dest_basename, writing_func)
|
dest = writing_func(filtered_df, dest_basename)
|
||||||
wrote_files.append(dest)
|
wrote_files.append(dest)
|
||||||
|
|
||||||
return wrote_files
|
return wrote_files
|
||||||
@ -102,6 +72,15 @@ class Flux(BaseModel):
|
|||||||
destination: Destination
|
destination: Destination
|
||||||
|
|
||||||
|
|
||||||
|
def to_csv(df, dest_basename: Path) -> Path:
|
||||||
|
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
||||||
|
if dest.exists():
|
||||||
|
df.to_csv(dest, mode="a", header=False, index=False)
|
||||||
|
else:
|
||||||
|
df.to_csv(dest, index=False)
|
||||||
|
return dest
|
||||||
|
|
||||||
|
|
||||||
def write_split_by(
|
def write_split_by(
|
||||||
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
|
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
|
||||||
) -> list[Path]:
|
) -> list[Path]:
|
||||||
@ -140,13 +119,16 @@ def split_duplicates(
|
|||||||
return no_duplicates, duplicated
|
return no_duplicates, duplicated
|
||||||
|
|
||||||
|
|
||||||
def consume_flux(
|
def consume_fluxes(
|
||||||
name: str,
|
fluxes: dict[str, Flux],
|
||||||
flux: Flux,
|
|
||||||
origin_path: Path,
|
origin_path: Path,
|
||||||
dest_path: Path,
|
dest_path: Path,
|
||||||
duplicated={},
|
writing_func=to_csv,
|
||||||
):
|
):
|
||||||
|
duplicated = {}
|
||||||
|
wrote_files = []
|
||||||
|
|
||||||
|
for name, flux in fluxes.items():
|
||||||
logging.info(f"Consume {name}")
|
logging.info(f"Consume {name}")
|
||||||
src_df = []
|
src_df = []
|
||||||
for filename, df in extract_sources(flux.sources, origin_path):
|
for filename, df in extract_sources(flux.sources, origin_path):
|
||||||
@ -156,22 +138,7 @@ def consume_flux(
|
|||||||
|
|
||||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
logging.info(f"Execute {flux.transformation.function.__name__}")
|
||||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||||
|
files = flux.destination.write(df, dest_path, writing_func)
|
||||||
files = flux.destination.write(df, dest_path)
|
|
||||||
|
|
||||||
logging.info(f"{files} written")
|
logging.info(f"{files} written")
|
||||||
return files
|
|
||||||
|
|
||||||
|
|
||||||
def consume_fluxes(
|
|
||||||
fluxes: dict[str, Flux],
|
|
||||||
origin_path: Path,
|
|
||||||
dest_path: Path,
|
|
||||||
):
|
|
||||||
duplicated = {}
|
|
||||||
wrote_files = []
|
|
||||||
|
|
||||||
for name, flux in fluxes.items():
|
|
||||||
files = consume_flux(name, flux, origin_path, dest_path, duplicated)
|
|
||||||
wrote_files += files
|
wrote_files += files
|
||||||
return wrote_files
|
return wrote_files
|
||||||
|
@ -11,7 +11,6 @@ from scripts.flux import (
|
|||||||
SplitDestination,
|
SplitDestination,
|
||||||
Transformation,
|
Transformation,
|
||||||
consume_fluxes,
|
consume_fluxes,
|
||||||
to_excel,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -27,9 +26,7 @@ FLUXES_LOT = {
|
|||||||
"Lots": Flux(
|
"Lots": Flux(
|
||||||
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||||
transformation=Transformation(function=build_lots),
|
transformation=Transformation(function=build_lots),
|
||||||
destination=SplitDestination(
|
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
||||||
name="Lot/lot", split_column="Lot", writer=to_excel
|
|
||||||
),
|
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,8 +75,6 @@ if __name__ == "__main__":
|
|||||||
pnl_fluxes = {}
|
pnl_fluxes = {}
|
||||||
|
|
||||||
files = consume_fluxes(
|
files = consume_fluxes(
|
||||||
fluxes=pnl_fluxes,
|
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
|
||||||
origin_path=gold_path,
|
|
||||||
dest_path=mart_path,
|
|
||||||
)
|
)
|
||||||
print(files)
|
print(files)
|
||||||
|
@ -12,9 +12,9 @@ logger.setLevel(logging.DEBUG)
|
|||||||
|
|
||||||
|
|
||||||
def extract_cat(cat: pd.DataFrame):
|
def extract_cat(cat: pd.DataFrame):
|
||||||
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"])
|
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
||||||
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
|
||||||
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
|
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
|
||||||
|
|
||||||
trans = {}
|
trans = {}
|
||||||
for _, (old, new) in cat_trans.iterrows():
|
for _, (old, new) in cat_trans.iterrows():
|
||||||
trans[old] = new
|
trans[old] = new
|
||||||
@ -140,7 +140,7 @@ FLUXES_CRG = {
|
|||||||
),
|
),
|
||||||
"2022 - charge.xlsx": Flux(
|
"2022 - charge.xlsx": Flux(
|
||||||
sources=[
|
sources=[
|
||||||
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"),
|
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
|
||||||
],
|
],
|
||||||
transformation=Transformation(
|
transformation=Transformation(
|
||||||
function=trans_2022_charge,
|
function=trans_2022_charge,
|
||||||
|
Loading…
Reference in New Issue
Block a user