Compare commits

..

No commits in common. "e2805f9af2ce9b8d6e54d8637c334813e30ea335" and "dd0d8af40cad1b6aeb1e4ed3f16366558f74ba91" have entirely different histories.

6 changed files with 29 additions and 134 deletions

View File

@ -2,4 +2,3 @@ jupyter==1.0.0
pandas==1.5.0 pandas==1.5.0
pdf-oralia==0.3.11 pdf-oralia==0.3.11
pydantic==2.6.1 pydantic==2.6.1
click==8.1.7

View File

@ -1,86 +0,0 @@
import logging
from logging.config import dictConfig
from pathlib import Path
import click
from .flux import consume_fluxes
DATA_PATH = Path("datas/")
assert DATA_PATH.exists()
HISTORY_PATH = DATA_PATH / "Histoire"
assert HISTORY_PATH.exists()
STAGING_PATH = DATA_PATH / "staging"
assert STAGING_PATH.exists()
GOLD_PATH = DATA_PATH / "gold"
assert GOLD_PATH.exists()
MART_PATH = DATA_PATH / "datamart"
assert MART_PATH.exists()
@click.group()
@click.option("--debug/--no-debug", default=False)
def main(debug):
if debug:
logging_level = logging.DEBUG
else:
logging_level = logging.INFO
logging_config = dict(
version=1,
formatters={"f": {"format": "%(levelname)-8s %(name)-12s %(message)s"}},
handlers={
"h": {
"class": "logging.StreamHandler",
"formatter": "f",
"level": logging_level,
}
},
root={
"handlers": ["h"],
"level": logging_level,
},
)
dictConfig(logging_config)
@main.command()
def ingest():
from .history_stagging import FLUXES_CRG
history_crg_path = HISTORY_PATH / "CRG"
assert history_crg_path.exists()
staging_crg_path = STAGING_PATH / "CRG"
assert staging_crg_path.exists()
consume_fluxes(
fluxes=FLUXES_CRG,
origin_path=history_crg_path,
dest_path=staging_crg_path,
)
@main.command()
def feature():
from .stagging_gold import FLUXES_CRG
staging_crg_path = STAGING_PATH / "CRG"
assert staging_crg_path.exists()
gold_crg_path = GOLD_PATH / "CRG"
assert gold_crg_path.exists()
consume_fluxes(
fluxes=FLUXES_CRG(staging_crg_path),
origin_path=staging_crg_path,
dest_path=gold_crg_path,
)
@main.command()
def datamart():
from .gold_mart import FLUXES_LOT
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
if __name__ == "__main__":
main()

View File

@ -6,6 +6,9 @@ from pathlib import Path
import pandas as pd import pandas as pd
from pydantic import BaseModel from pydantic import BaseModel
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class Source(BaseModel): class Source(BaseModel):
filename: str filename: str
@ -20,7 +23,7 @@ class ExcelSource(Source):
def get_df(self, base_path: Path) -> pd.DataFrame: def get_df(self, base_path: Path) -> pd.DataFrame:
filepath = base_path / self.filename filepath = base_path / self.filename
logging.debug(f"Get content of {filepath}") logger.debug(f"Get content of {filepath}")
return pd.read_excel(filepath, sheet_name=self.sheet_name) return pd.read_excel(filepath, sheet_name=self.sheet_name)
@ -29,7 +32,7 @@ class CSVSource(Source):
def get_df(self, base_path: Path) -> pd.DataFrame: def get_df(self, base_path: Path) -> pd.DataFrame:
filepath = base_path / self.filename filepath = base_path / self.filename
logging.debug(f"Get content of {filepath}") logger.debug(f"Get content of {filepath}")
return pd.read_csv(filepath, **self.options) return pd.read_csv(filepath, **self.options)
@ -129,16 +132,15 @@ def consume_fluxes(
wrote_files = [] wrote_files = []
for name, flux in fluxes.items(): for name, flux in fluxes.items():
logging.info(f"Consume {name}") print(name)
logger.info(f"Processing flux {name}")
src_df = [] src_df = []
for filename, df in extract_sources(flux.sources, origin_path): for filename, df in extract_sources(flux.sources, origin_path):
logging.info(f"Extracting {filename}")
df, duplicated = split_duplicates(df, str(filename), duplicated) df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df) src_df.append(df)
logging.info(f"Execute {flux.transformation.function.__name__}")
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds) df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
files = flux.destination.write(df, dest_path, writing_func) files = flux.destination.write(df, dest_path, writing_func)
logging.info(f"{files} written")
wrote_files += files wrote_files += files
return wrote_files return wrote_files

View File

@ -18,22 +18,13 @@ logger.setLevel(logging.DEBUG)
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame: def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
df = pd.concat(dfs) df = dfs[0]
df = df.assign( df = df.assign(
Impact=df["Crédit"] - df["Débit"], Impact=df["Crédit"] - df["Débit"],
) )
return df return df
FLUXES_LOT = {
"Lots": Flux(
sources=[CSVSource(filename="CRG/crg-*.csv")],
transformation=Transformation(function=build_lots),
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
),
}
def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame: def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame:
df = pd.concat(dfs) df = pd.concat(dfs)
df = df[df["Année"] == year] df = df[df["Année"] == year]
@ -49,10 +40,8 @@ def build_pnl_flux(year: int) -> Flux:
CSVSource(filename=f"CRG/crg-{year}.csv"), CSVSource(filename=f"CRG/crg-{year}.csv"),
CSVSource(filename=f"banque/banque-{year}.csv"), CSVSource(filename=f"banque/banque-{year}.csv"),
], ],
transformation=Transformation( transformation=build_pnl,
function=build_pnl, extra_kwrds={"year": year},
extra_kwrds={"year": year},
),
destination=Destination(name=f"pnl/{year}"), destination=Destination(name=f"pnl/{year}"),
) )
@ -69,15 +58,18 @@ if __name__ == "__main__":
mart_path = data_path / "datamart" mart_path = data_path / "datamart"
assert mart_path.exists() assert mart_path.exists()
files = consume_fluxes( lot_fluxes = {
fluxes=FLUXES_LOT, origin_path=gold_path, dest_path=mart_path "Lots": Flux(
) sources=[CSVSource(filename="CRG/crg-*.csv")],
transformation=Transformation(function=build_lots),
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
),
}
years = list(range(2017, 2024)) years = list(range(2017, 2024))
# pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years} # pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years}
pnl_fluxes = {} pnl_fluxes = {}
files = consume_fluxes( files = consume_fluxes(
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path fluxes={**lot_fluxes, **pnl_fluxes}, origin_path=gold_path, dest_path=mart_path
) )
print(files) print(files)

View File

@ -22,12 +22,6 @@ def extract_cat(cat: pd.DataFrame):
return trans, cat_drop return trans, cat_drop
def lot_naming(value):
if str(value).isnumeric():
return str(value).zfill(2)
return "PC"
def trans_2017_2021( def trans_2017_2021(
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
) -> pd.DataFrame: ) -> pd.DataFrame:
@ -41,7 +35,7 @@ def trans_2017_2021(
Porte=df["porte"], Porte=df["porte"],
Débit=df["Débit"].fillna(0), Débit=df["Débit"].fillna(0),
Crédit=df["Crédit"].fillna(0), Crédit=df["Crédit"].fillna(0),
Lot=df["porte"].apply(lot_naming), Lot=df["immeuble"].astype(str) + df["porte"].astype("str").str.zfill(2),
Année=df["Date"].astype(str).str.slice(0, 4), Année=df["Date"].astype(str).str.slice(0, 4),
Mois=df["Date"].astype(str).str.slice(5, 7), Mois=df["Date"].astype(str).str.slice(5, 7),
Catégorie=df["Categorie"].replace(cat_trans), Catégorie=df["Categorie"].replace(cat_trans),
@ -60,7 +54,7 @@ def trans_2022_charge(
Porte=df["lot"], Porte=df["lot"],
Débit=df["Débits"].fillna(0), Débit=df["Débits"].fillna(0),
Crédit=df["Crédits"].fillna(0), Crédit=df["Crédits"].fillna(0),
Lot=df["lot"].apply(lot_naming), Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
Année=df["annee"], Année=df["annee"],
Mois=df["mois"], Mois=df["mois"],
Catégorie=df["Catégorie Charge"], Catégorie=df["Catégorie Charge"],
@ -81,7 +75,7 @@ def trans_2022_loc(
Porte=df["lot"], Porte=df["lot"],
Débit=0, Débit=0,
Crédit=df["Réglés"].fillna(0), Crédit=df["Réglés"].fillna(0),
Lot=df["lot"].apply(lot_naming), Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
Année=df["annee"], Année=df["annee"],
Mois=df["mois"], Mois=df["mois"],
Catégorie="Loyer Charge", Catégorie="Loyer Charge",
@ -99,7 +93,7 @@ def trans_2023(
df = df.assign( df = df.assign(
Débit=df["Débit"].fillna(0), Débit=df["Débit"].fillna(0),
Crédit=df["Crédit"].fillna(0), Crédit=df["Crédit"].fillna(0),
Lot=lot_naming(df["Porte"]), Lot=df["Immeuble"].astype(str) + df["Porte"].astype("str").str.zfill(2),
Année=year, Année=year,
) )
return df[stagging_columns] return df[stagging_columns]
@ -119,7 +113,7 @@ STAGGING_COLUMNS = [
"Crédit", "Crédit",
] ]
FLUXES_CRG = { FLUXES = {
"2017 2021 - charge et locataire.xlsx": Flux( "2017 2021 - charge et locataire.xlsx": Flux(
sources=[ sources=[
ExcelSource( ExcelSource(
@ -185,7 +179,7 @@ if __name__ == "__main__":
assert staging_crg_path.exists() assert staging_crg_path.exists()
crg_files = consume_fluxes( crg_files = consume_fluxes(
fluxes=FLUXES_CRG, fluxes=FLUXES,
origin_path=history_crg_path, origin_path=history_crg_path,
dest_path=staging_crg_path, dest_path=staging_crg_path,
) )

View File

@ -14,7 +14,6 @@ def feature_crg(dfs: list[pd.DataFrame]) -> pd.DataFrame:
df = dfs[0] df = dfs[0]
df = df.assign( df = df.assign(
Impact=df["Crédit"] - df["Débit"], Impact=df["Crédit"] - df["Débit"],
Lot=df["Immeuble"].astype(str) + df["Lot"].astype("str"),
) )
return df return df
@ -48,12 +47,6 @@ def build_crg_fluxes(
return fluxes return fluxes
def FLUXES_CRG(staging_crg_path: Path):
return build_crg_fluxes(
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
)
if __name__ == "__main__": if __name__ == "__main__":
data_path = Path("datas/") data_path = Path("datas/")
assert data_path.exists() assert data_path.exists()
@ -68,9 +61,10 @@ if __name__ == "__main__":
gold_crg_path = gold_path / "CRG" gold_crg_path = gold_path / "CRG"
assert gold_crg_path.exists() assert gold_crg_path.exists()
fluxes = build_crg_fluxes(
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
)
crg_files = consume_fluxes( crg_files = consume_fluxes(
fluxes=FLUXES_CRG(staging_crg_path), fluxes=fluxes, origin_path=staging_crg_path, dest_path=gold_crg_path
origin_path=staging_crg_path,
dest_path=gold_crg_path,
) )
print(crg_files) print(crg_files)