Compare commits
2 Commits
a1578f813b
...
215e26b84f
Author | SHA1 | Date | |
---|---|---|---|
215e26b84f | |||
b60fa3be17 |
@ -79,7 +79,11 @@ def feature():
|
|||||||
def datamart():
|
def datamart():
|
||||||
from .gold_mart import FLUXES_LOT
|
from .gold_mart import FLUXES_LOT
|
||||||
|
|
||||||
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
|
consume_fluxes(
|
||||||
|
fluxes=FLUXES_LOT,
|
||||||
|
origin_path=GOLD_PATH,
|
||||||
|
dest_path=MART_PATH,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -4,7 +4,7 @@ from collections.abc import Callable
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
class Source(BaseModel):
|
class Source(BaseModel):
|
||||||
@ -38,21 +38,51 @@ class Transformation(BaseModel):
|
|||||||
extra_kwrds: dict = {}
|
extra_kwrds: dict = {}
|
||||||
|
|
||||||
|
|
||||||
|
def to_csv(df, dest_basename: Path) -> Path:
|
||||||
|
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
||||||
|
if dest.exists():
|
||||||
|
df.to_csv(dest, mode="a", header=False, index=False)
|
||||||
|
else:
|
||||||
|
df.to_csv(dest, index=False)
|
||||||
|
return dest
|
||||||
|
|
||||||
|
|
||||||
|
def to_excel(df, dest_basename: Path) -> Path:
|
||||||
|
dest = dest_basename.parent / (dest_basename.stem + ".xlsx")
|
||||||
|
if dest.exists():
|
||||||
|
raise ValueError(f"The destination exits {dest}")
|
||||||
|
else:
|
||||||
|
df.to_excel(dest)
|
||||||
|
return dest
|
||||||
|
|
||||||
|
|
||||||
class Destination(BaseModel):
|
class Destination(BaseModel):
|
||||||
name: str
|
name: str
|
||||||
|
writer: Callable = Field(to_csv)
|
||||||
|
|
||||||
|
def _write(
|
||||||
|
self,
|
||||||
|
df: pd.DataFrame,
|
||||||
|
dest_basename: Path,
|
||||||
|
writing_func: Callable | None = None,
|
||||||
|
) -> Path:
|
||||||
|
if writing_func is None:
|
||||||
|
writing_func = self.writer
|
||||||
|
|
||||||
|
return writing_func(df, dest_basename)
|
||||||
|
|
||||||
def write(
|
def write(
|
||||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
||||||
) -> list[Path]:
|
) -> list[Path]:
|
||||||
dest_basename = dest_path / self.name
|
dest_basename = dest_path / self.name
|
||||||
return [writing_func(df, dest_basename)]
|
return [self._write(df, dest_basename, writing_func)]
|
||||||
|
|
||||||
|
|
||||||
class SplitDestination(Destination):
|
class SplitDestination(Destination):
|
||||||
split_column: str
|
split_column: str
|
||||||
|
|
||||||
def write(
|
def write(
|
||||||
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable | None = None
|
||||||
) -> list[Path]:
|
) -> list[Path]:
|
||||||
wrote_files = []
|
wrote_files = []
|
||||||
|
|
||||||
@ -60,7 +90,7 @@ class SplitDestination(Destination):
|
|||||||
filtered_df = df[df[self.split_column] == col_value]
|
filtered_df = df[df[self.split_column] == col_value]
|
||||||
|
|
||||||
dest_basename = dest_path / f"{self.name}-{col_value}"
|
dest_basename = dest_path / f"{self.name}-{col_value}"
|
||||||
dest = writing_func(filtered_df, dest_basename)
|
dest = self._write(filtered_df, dest_basename, writing_func)
|
||||||
wrote_files.append(dest)
|
wrote_files.append(dest)
|
||||||
|
|
||||||
return wrote_files
|
return wrote_files
|
||||||
@ -72,15 +102,6 @@ class Flux(BaseModel):
|
|||||||
destination: Destination
|
destination: Destination
|
||||||
|
|
||||||
|
|
||||||
def to_csv(df, dest_basename: Path) -> Path:
|
|
||||||
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
|
||||||
if dest.exists():
|
|
||||||
df.to_csv(dest, mode="a", header=False, index=False)
|
|
||||||
else:
|
|
||||||
df.to_csv(dest, index=False)
|
|
||||||
return dest
|
|
||||||
|
|
||||||
|
|
||||||
def write_split_by(
|
def write_split_by(
|
||||||
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
|
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
|
||||||
) -> list[Path]:
|
) -> list[Path]:
|
||||||
@ -119,26 +140,38 @@ def split_duplicates(
|
|||||||
return no_duplicates, duplicated
|
return no_duplicates, duplicated
|
||||||
|
|
||||||
|
|
||||||
|
def consume_flux(
|
||||||
|
name: str,
|
||||||
|
flux: Flux,
|
||||||
|
origin_path: Path,
|
||||||
|
dest_path: Path,
|
||||||
|
duplicated={},
|
||||||
|
):
|
||||||
|
logging.info(f"Consume {name}")
|
||||||
|
src_df = []
|
||||||
|
for filename, df in extract_sources(flux.sources, origin_path):
|
||||||
|
logging.info(f"Extracting {filename}")
|
||||||
|
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||||
|
src_df.append(df)
|
||||||
|
|
||||||
|
logging.info(f"Execute {flux.transformation.function.__name__}")
|
||||||
|
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||||
|
|
||||||
|
files = flux.destination.write(df, dest_path)
|
||||||
|
|
||||||
|
logging.info(f"{files} written")
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
def consume_fluxes(
|
def consume_fluxes(
|
||||||
fluxes: dict[str, Flux],
|
fluxes: dict[str, Flux],
|
||||||
origin_path: Path,
|
origin_path: Path,
|
||||||
dest_path: Path,
|
dest_path: Path,
|
||||||
writing_func=to_csv,
|
|
||||||
):
|
):
|
||||||
duplicated = {}
|
duplicated = {}
|
||||||
wrote_files = []
|
wrote_files = []
|
||||||
|
|
||||||
for name, flux in fluxes.items():
|
for name, flux in fluxes.items():
|
||||||
logging.info(f"Consume {name}")
|
files = consume_flux(name, flux, origin_path, dest_path, duplicated)
|
||||||
src_df = []
|
|
||||||
for filename, df in extract_sources(flux.sources, origin_path):
|
|
||||||
logging.info(f"Extracting {filename}")
|
|
||||||
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
|
||||||
src_df.append(df)
|
|
||||||
|
|
||||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
|
||||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
|
||||||
files = flux.destination.write(df, dest_path, writing_func)
|
|
||||||
logging.info(f"{files} written")
|
|
||||||
wrote_files += files
|
wrote_files += files
|
||||||
return wrote_files
|
return wrote_files
|
||||||
|
@ -11,6 +11,7 @@ from scripts.flux import (
|
|||||||
SplitDestination,
|
SplitDestination,
|
||||||
Transformation,
|
Transformation,
|
||||||
consume_fluxes,
|
consume_fluxes,
|
||||||
|
to_excel,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -26,7 +27,9 @@ FLUXES_LOT = {
|
|||||||
"Lots": Flux(
|
"Lots": Flux(
|
||||||
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||||
transformation=Transformation(function=build_lots),
|
transformation=Transformation(function=build_lots),
|
||||||
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
destination=SplitDestination(
|
||||||
|
name="Lot/lot", split_column="Lot", writer=to_excel
|
||||||
|
),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,6 +78,8 @@ if __name__ == "__main__":
|
|||||||
pnl_fluxes = {}
|
pnl_fluxes = {}
|
||||||
|
|
||||||
files = consume_fluxes(
|
files = consume_fluxes(
|
||||||
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
|
fluxes=pnl_fluxes,
|
||||||
|
origin_path=gold_path,
|
||||||
|
dest_path=mart_path,
|
||||||
)
|
)
|
||||||
print(files)
|
print(files)
|
||||||
|
@ -12,9 +12,9 @@ logger.setLevel(logging.DEBUG)
|
|||||||
|
|
||||||
|
|
||||||
def extract_cat(cat: pd.DataFrame):
|
def extract_cat(cat: pd.DataFrame):
|
||||||
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"])
|
||||||
|
# cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
||||||
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
|
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
|
||||||
|
|
||||||
trans = {}
|
trans = {}
|
||||||
for _, (old, new) in cat_trans.iterrows():
|
for _, (old, new) in cat_trans.iterrows():
|
||||||
trans[old] = new
|
trans[old] = new
|
||||||
@ -140,7 +140,7 @@ FLUXES_CRG = {
|
|||||||
),
|
),
|
||||||
"2022 - charge.xlsx": Flux(
|
"2022 - charge.xlsx": Flux(
|
||||||
sources=[
|
sources=[
|
||||||
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
|
ExcelSource(filename="2022 - charge.xlsx", sheet_name="DB CRG"),
|
||||||
],
|
],
|
||||||
transformation=Transformation(
|
transformation=Transformation(
|
||||||
function=trans_2022_charge,
|
function=trans_2022_charge,
|
||||||
|
Loading…
Reference in New Issue
Block a user