Feat: write cli feature, datamart
This commit is contained in:
parent
98691d5531
commit
e2805f9af2
@ -1,5 +1,22 @@
|
||||
import logging
|
||||
from logging.config import dictConfig
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
from .flux import consume_fluxes
|
||||
|
||||
DATA_PATH = Path("datas/")
|
||||
assert DATA_PATH.exists()
|
||||
HISTORY_PATH = DATA_PATH / "Histoire"
|
||||
assert HISTORY_PATH.exists()
|
||||
STAGING_PATH = DATA_PATH / "staging"
|
||||
assert STAGING_PATH.exists()
|
||||
GOLD_PATH = DATA_PATH / "gold"
|
||||
assert GOLD_PATH.exists()
|
||||
MART_PATH = DATA_PATH / "datamart"
|
||||
assert MART_PATH.exists()
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("--debug/--no-debug", default=False)
|
||||
@ -29,14 +46,41 @@ def main(debug):
|
||||
|
||||
@main.command()
|
||||
def ingest():
|
||||
pass
|
||||
from .history_stagging import FLUXES_CRG
|
||||
|
||||
history_crg_path = HISTORY_PATH / "CRG"
|
||||
assert history_crg_path.exists()
|
||||
staging_crg_path = STAGING_PATH / "CRG"
|
||||
assert staging_crg_path.exists()
|
||||
consume_fluxes(
|
||||
fluxes=FLUXES_CRG,
|
||||
origin_path=history_crg_path,
|
||||
dest_path=staging_crg_path,
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
def feature():
|
||||
pass
|
||||
from .stagging_gold import FLUXES_CRG
|
||||
|
||||
staging_crg_path = STAGING_PATH / "CRG"
|
||||
assert staging_crg_path.exists()
|
||||
gold_crg_path = GOLD_PATH / "CRG"
|
||||
assert gold_crg_path.exists()
|
||||
|
||||
consume_fluxes(
|
||||
fluxes=FLUXES_CRG(staging_crg_path),
|
||||
origin_path=staging_crg_path,
|
||||
dest_path=gold_crg_path,
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
def datamart():
|
||||
pass
|
||||
from .gold_mart import FLUXES_LOT
|
||||
|
||||
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
@ -6,9 +6,6 @@ from pathlib import Path
|
||||
import pandas as pd
|
||||
from pydantic import BaseModel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class Source(BaseModel):
|
||||
filename: str
|
||||
@ -23,7 +20,7 @@ class ExcelSource(Source):
|
||||
|
||||
def get_df(self, base_path: Path) -> pd.DataFrame:
|
||||
filepath = base_path / self.filename
|
||||
logger.debug(f"Get content of {filepath}")
|
||||
logging.debug(f"Get content of {filepath}")
|
||||
return pd.read_excel(filepath, sheet_name=self.sheet_name)
|
||||
|
||||
|
||||
@ -32,7 +29,7 @@ class CSVSource(Source):
|
||||
|
||||
def get_df(self, base_path: Path) -> pd.DataFrame:
|
||||
filepath = base_path / self.filename
|
||||
logger.debug(f"Get content of {filepath}")
|
||||
logging.debug(f"Get content of {filepath}")
|
||||
return pd.read_csv(filepath, **self.options)
|
||||
|
||||
|
||||
@ -132,17 +129,16 @@ def consume_fluxes(
|
||||
wrote_files = []
|
||||
|
||||
for name, flux in fluxes.items():
|
||||
print(f"Consume {name}")
|
||||
logger.info(f"Processing flux {name}")
|
||||
logging.info(f"Consume {name}")
|
||||
src_df = []
|
||||
for filename, df in extract_sources(flux.sources, origin_path):
|
||||
print(f"Extracting {filename}")
|
||||
logging.info(f"Extracting {filename}")
|
||||
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||
src_df.append(df)
|
||||
|
||||
print(f"Execute {flux.transformation.function.__name__}")
|
||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||
files = flux.destination.write(df, dest_path, writing_func)
|
||||
print(files)
|
||||
logging.info(f"{files} written")
|
||||
wrote_files += files
|
||||
return wrote_files
|
||||
|
@ -25,6 +25,15 @@ def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
||||
return df
|
||||
|
||||
|
||||
FLUXES_LOT = {
|
||||
"Lots": Flux(
|
||||
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||
transformation=Transformation(function=build_lots),
|
||||
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame:
|
||||
df = pd.concat(dfs)
|
||||
df = df[df["Année"] == year]
|
||||
@ -60,18 +69,15 @@ if __name__ == "__main__":
|
||||
mart_path = data_path / "datamart"
|
||||
assert mart_path.exists()
|
||||
|
||||
lot_fluxes = {
|
||||
"Lots": Flux(
|
||||
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||
transformation=Transformation(function=build_lots),
|
||||
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
||||
),
|
||||
}
|
||||
files = consume_fluxes(
|
||||
fluxes=FLUXES_LOT, origin_path=gold_path, dest_path=mart_path
|
||||
)
|
||||
|
||||
years = list(range(2017, 2024))
|
||||
# pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years}
|
||||
pnl_fluxes = {}
|
||||
|
||||
files = consume_fluxes(
|
||||
fluxes={**lot_fluxes, **pnl_fluxes}, origin_path=gold_path, dest_path=mart_path
|
||||
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
|
||||
)
|
||||
print(files)
|
||||
|
@ -119,7 +119,7 @@ STAGGING_COLUMNS = [
|
||||
"Crédit",
|
||||
]
|
||||
|
||||
FLUXES = {
|
||||
FLUXES_CRG = {
|
||||
"2017 2021 - charge et locataire.xlsx": Flux(
|
||||
sources=[
|
||||
ExcelSource(
|
||||
@ -185,7 +185,7 @@ if __name__ == "__main__":
|
||||
assert staging_crg_path.exists()
|
||||
|
||||
crg_files = consume_fluxes(
|
||||
fluxes=FLUXES,
|
||||
fluxes=FLUXES_CRG,
|
||||
origin_path=history_crg_path,
|
||||
dest_path=staging_crg_path,
|
||||
)
|
||||
|
@ -48,6 +48,12 @@ def build_crg_fluxes(
|
||||
return fluxes
|
||||
|
||||
|
||||
def FLUXES_CRG(staging_crg_path: Path):
|
||||
return build_crg_fluxes(
|
||||
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
data_path = Path("datas/")
|
||||
assert data_path.exists()
|
||||
@ -62,10 +68,9 @@ if __name__ == "__main__":
|
||||
gold_crg_path = gold_path / "CRG"
|
||||
assert gold_crg_path.exists()
|
||||
|
||||
fluxes = build_crg_fluxes(
|
||||
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
|
||||
)
|
||||
crg_files = consume_fluxes(
|
||||
fluxes=fluxes, origin_path=staging_crg_path, dest_path=gold_crg_path
|
||||
fluxes=FLUXES_CRG(staging_crg_path),
|
||||
origin_path=staging_crg_path,
|
||||
dest_path=gold_crg_path,
|
||||
)
|
||||
print(crg_files)
|
||||
|
Loading…
Reference in New Issue
Block a user