plesna/scripts/stagging_gold.py

77 lines
1.8 KiB
Python
Raw Normal View History

2024-03-03 05:39:27 +00:00
import logging
from collections.abc import Callable
from pathlib import Path
import pandas as pd
from scripts.flux import CSVSource, Destination, Flux, Transformation, consume_fluxes
2024-03-03 05:39:27 +00:00
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def feature_crg(dfs: list[pd.DataFrame]) -> pd.DataFrame:
df = dfs[0]
df = df.assign(
Impact=df["Crédit"] - df["Débit"],
2024-03-05 18:00:17 +00:00
Lot=df["Immeuble"].astype(str) + df["Lot"].astype("str"),
2024-03-03 05:39:27 +00:00
)
return df
GOLD_COLUMNS = [
"Régie",
"Immeuble",
"Porte",
"Lot",
"Année",
"Mois",
"Catégorie",
"Fournisseur",
"Libellé",
"Débit",
"Crédit",
"Impact",
]
def build_crg_fluxes(
crg_path: Path, pattern: str, transformation: Callable, csv_options: dict = {}
) -> dict[str, Flux]:
fluxes = {}
for file in crg_path.glob(pattern):
fluxes[f"CRG - {file.name}"] = Flux(
sources=[CSVSource(filename=file.name, options=csv_options)],
transformation=Transformation(function=transformation),
destination=Destination(name=file.name),
2024-03-03 05:39:27 +00:00
)
return fluxes
2024-03-05 18:20:33 +00:00
def FLUXES_CRG(staging_crg_path: Path):
return build_crg_fluxes(
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
)
2024-03-03 05:39:27 +00:00
if __name__ == "__main__":
data_path = Path("datas/")
assert data_path.exists()
staging_path = data_path / "staging"
assert staging_path.exists()
staging_crg_path = staging_path / "CRG"
assert staging_crg_path.exists()
gold_path = data_path / "gold"
assert gold_path.exists()
gold_crg_path = gold_path / "CRG"
assert gold_crg_path.exists()
crg_files = consume_fluxes(
2024-03-05 18:20:33 +00:00
fluxes=FLUXES_CRG(staging_crg_path),
origin_path=staging_crg_path,
dest_path=gold_crg_path,
2024-03-03 05:39:27 +00:00
)
print(crg_files)