Compare commits
No commits in common. "e2805f9af2ce9b8d6e54d8637c334813e30ea335" and "dd0d8af40cad1b6aeb1e4ed3f16366558f74ba91" have entirely different histories.
e2805f9af2
...
dd0d8af40c
@ -2,4 +2,3 @@ jupyter==1.0.0
|
|||||||
pandas==1.5.0
|
pandas==1.5.0
|
||||||
pdf-oralia==0.3.11
|
pdf-oralia==0.3.11
|
||||||
pydantic==2.6.1
|
pydantic==2.6.1
|
||||||
click==8.1.7
|
|
||||||
|
@ -1,86 +0,0 @@
|
|||||||
import logging
|
|
||||||
from logging.config import dictConfig
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import click
|
|
||||||
|
|
||||||
from .flux import consume_fluxes
|
|
||||||
|
|
||||||
DATA_PATH = Path("datas/")
|
|
||||||
assert DATA_PATH.exists()
|
|
||||||
HISTORY_PATH = DATA_PATH / "Histoire"
|
|
||||||
assert HISTORY_PATH.exists()
|
|
||||||
STAGING_PATH = DATA_PATH / "staging"
|
|
||||||
assert STAGING_PATH.exists()
|
|
||||||
GOLD_PATH = DATA_PATH / "gold"
|
|
||||||
assert GOLD_PATH.exists()
|
|
||||||
MART_PATH = DATA_PATH / "datamart"
|
|
||||||
assert MART_PATH.exists()
|
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
|
||||||
@click.option("--debug/--no-debug", default=False)
|
|
||||||
def main(debug):
|
|
||||||
if debug:
|
|
||||||
logging_level = logging.DEBUG
|
|
||||||
else:
|
|
||||||
logging_level = logging.INFO
|
|
||||||
logging_config = dict(
|
|
||||||
version=1,
|
|
||||||
formatters={"f": {"format": "%(levelname)-8s %(name)-12s %(message)s"}},
|
|
||||||
handlers={
|
|
||||||
"h": {
|
|
||||||
"class": "logging.StreamHandler",
|
|
||||||
"formatter": "f",
|
|
||||||
"level": logging_level,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
root={
|
|
||||||
"handlers": ["h"],
|
|
||||||
"level": logging_level,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
dictConfig(logging_config)
|
|
||||||
|
|
||||||
|
|
||||||
@main.command()
|
|
||||||
def ingest():
|
|
||||||
from .history_stagging import FLUXES_CRG
|
|
||||||
|
|
||||||
history_crg_path = HISTORY_PATH / "CRG"
|
|
||||||
assert history_crg_path.exists()
|
|
||||||
staging_crg_path = STAGING_PATH / "CRG"
|
|
||||||
assert staging_crg_path.exists()
|
|
||||||
consume_fluxes(
|
|
||||||
fluxes=FLUXES_CRG,
|
|
||||||
origin_path=history_crg_path,
|
|
||||||
dest_path=staging_crg_path,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@main.command()
|
|
||||||
def feature():
|
|
||||||
from .stagging_gold import FLUXES_CRG
|
|
||||||
|
|
||||||
staging_crg_path = STAGING_PATH / "CRG"
|
|
||||||
assert staging_crg_path.exists()
|
|
||||||
gold_crg_path = GOLD_PATH / "CRG"
|
|
||||||
assert gold_crg_path.exists()
|
|
||||||
|
|
||||||
consume_fluxes(
|
|
||||||
fluxes=FLUXES_CRG(staging_crg_path),
|
|
||||||
origin_path=staging_crg_path,
|
|
||||||
dest_path=gold_crg_path,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@main.command()
|
|
||||||
def datamart():
|
|
||||||
from .gold_mart import FLUXES_LOT
|
|
||||||
|
|
||||||
consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -6,6 +6,9 @@ from pathlib import Path
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
class Source(BaseModel):
|
class Source(BaseModel):
|
||||||
filename: str
|
filename: str
|
||||||
@ -20,7 +23,7 @@ class ExcelSource(Source):
|
|||||||
|
|
||||||
def get_df(self, base_path: Path) -> pd.DataFrame:
|
def get_df(self, base_path: Path) -> pd.DataFrame:
|
||||||
filepath = base_path / self.filename
|
filepath = base_path / self.filename
|
||||||
logging.debug(f"Get content of {filepath}")
|
logger.debug(f"Get content of {filepath}")
|
||||||
return pd.read_excel(filepath, sheet_name=self.sheet_name)
|
return pd.read_excel(filepath, sheet_name=self.sheet_name)
|
||||||
|
|
||||||
|
|
||||||
@ -29,7 +32,7 @@ class CSVSource(Source):
|
|||||||
|
|
||||||
def get_df(self, base_path: Path) -> pd.DataFrame:
|
def get_df(self, base_path: Path) -> pd.DataFrame:
|
||||||
filepath = base_path / self.filename
|
filepath = base_path / self.filename
|
||||||
logging.debug(f"Get content of {filepath}")
|
logger.debug(f"Get content of {filepath}")
|
||||||
return pd.read_csv(filepath, **self.options)
|
return pd.read_csv(filepath, **self.options)
|
||||||
|
|
||||||
|
|
||||||
@ -129,16 +132,15 @@ def consume_fluxes(
|
|||||||
wrote_files = []
|
wrote_files = []
|
||||||
|
|
||||||
for name, flux in fluxes.items():
|
for name, flux in fluxes.items():
|
||||||
logging.info(f"Consume {name}")
|
print(name)
|
||||||
|
logger.info(f"Processing flux {name}")
|
||||||
src_df = []
|
src_df = []
|
||||||
for filename, df in extract_sources(flux.sources, origin_path):
|
for filename, df in extract_sources(flux.sources, origin_path):
|
||||||
logging.info(f"Extracting {filename}")
|
|
||||||
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||||
src_df.append(df)
|
src_df.append(df)
|
||||||
|
|
||||||
logging.info(f"Execute {flux.transformation.function.__name__}")
|
|
||||||
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||||
|
|
||||||
files = flux.destination.write(df, dest_path, writing_func)
|
files = flux.destination.write(df, dest_path, writing_func)
|
||||||
logging.info(f"{files} written")
|
|
||||||
wrote_files += files
|
wrote_files += files
|
||||||
return wrote_files
|
return wrote_files
|
||||||
|
@ -18,22 +18,13 @@ logger.setLevel(logging.DEBUG)
|
|||||||
|
|
||||||
|
|
||||||
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
||||||
df = pd.concat(dfs)
|
df = dfs[0]
|
||||||
df = df.assign(
|
df = df.assign(
|
||||||
Impact=df["Crédit"] - df["Débit"],
|
Impact=df["Crédit"] - df["Débit"],
|
||||||
)
|
)
|
||||||
return df
|
return df
|
||||||
|
|
||||||
|
|
||||||
FLUXES_LOT = {
|
|
||||||
"Lots": Flux(
|
|
||||||
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
|
||||||
transformation=Transformation(function=build_lots),
|
|
||||||
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame:
|
def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame:
|
||||||
df = pd.concat(dfs)
|
df = pd.concat(dfs)
|
||||||
df = df[df["Année"] == year]
|
df = df[df["Année"] == year]
|
||||||
@ -49,10 +40,8 @@ def build_pnl_flux(year: int) -> Flux:
|
|||||||
CSVSource(filename=f"CRG/crg-{year}.csv"),
|
CSVSource(filename=f"CRG/crg-{year}.csv"),
|
||||||
CSVSource(filename=f"banque/banque-{year}.csv"),
|
CSVSource(filename=f"banque/banque-{year}.csv"),
|
||||||
],
|
],
|
||||||
transformation=Transformation(
|
transformation=build_pnl,
|
||||||
function=build_pnl,
|
|
||||||
extra_kwrds={"year": year},
|
extra_kwrds={"year": year},
|
||||||
),
|
|
||||||
destination=Destination(name=f"pnl/{year}"),
|
destination=Destination(name=f"pnl/{year}"),
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -69,15 +58,18 @@ if __name__ == "__main__":
|
|||||||
mart_path = data_path / "datamart"
|
mart_path = data_path / "datamart"
|
||||||
assert mart_path.exists()
|
assert mart_path.exists()
|
||||||
|
|
||||||
files = consume_fluxes(
|
lot_fluxes = {
|
||||||
fluxes=FLUXES_LOT, origin_path=gold_path, dest_path=mart_path
|
"Lots": Flux(
|
||||||
)
|
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||||
|
transformation=Transformation(function=build_lots),
|
||||||
|
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
||||||
|
),
|
||||||
|
}
|
||||||
years = list(range(2017, 2024))
|
years = list(range(2017, 2024))
|
||||||
# pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years}
|
# pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years}
|
||||||
pnl_fluxes = {}
|
pnl_fluxes = {}
|
||||||
|
|
||||||
files = consume_fluxes(
|
files = consume_fluxes(
|
||||||
fluxes=pnl_fluxes, origin_path=gold_path, dest_path=mart_path
|
fluxes={**lot_fluxes, **pnl_fluxes}, origin_path=gold_path, dest_path=mart_path
|
||||||
)
|
)
|
||||||
print(files)
|
print(files)
|
||||||
|
@ -22,12 +22,6 @@ def extract_cat(cat: pd.DataFrame):
|
|||||||
return trans, cat_drop
|
return trans, cat_drop
|
||||||
|
|
||||||
|
|
||||||
def lot_naming(value):
|
|
||||||
if str(value).isnumeric():
|
|
||||||
return str(value).zfill(2)
|
|
||||||
return "PC"
|
|
||||||
|
|
||||||
|
|
||||||
def trans_2017_2021(
|
def trans_2017_2021(
|
||||||
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
|
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
|
||||||
) -> pd.DataFrame:
|
) -> pd.DataFrame:
|
||||||
@ -41,7 +35,7 @@ def trans_2017_2021(
|
|||||||
Porte=df["porte"],
|
Porte=df["porte"],
|
||||||
Débit=df["Débit"].fillna(0),
|
Débit=df["Débit"].fillna(0),
|
||||||
Crédit=df["Crédit"].fillna(0),
|
Crédit=df["Crédit"].fillna(0),
|
||||||
Lot=df["porte"].apply(lot_naming),
|
Lot=df["immeuble"].astype(str) + df["porte"].astype("str").str.zfill(2),
|
||||||
Année=df["Date"].astype(str).str.slice(0, 4),
|
Année=df["Date"].astype(str).str.slice(0, 4),
|
||||||
Mois=df["Date"].astype(str).str.slice(5, 7),
|
Mois=df["Date"].astype(str).str.slice(5, 7),
|
||||||
Catégorie=df["Categorie"].replace(cat_trans),
|
Catégorie=df["Categorie"].replace(cat_trans),
|
||||||
@ -60,7 +54,7 @@ def trans_2022_charge(
|
|||||||
Porte=df["lot"],
|
Porte=df["lot"],
|
||||||
Débit=df["Débits"].fillna(0),
|
Débit=df["Débits"].fillna(0),
|
||||||
Crédit=df["Crédits"].fillna(0),
|
Crédit=df["Crédits"].fillna(0),
|
||||||
Lot=df["lot"].apply(lot_naming),
|
Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
|
||||||
Année=df["annee"],
|
Année=df["annee"],
|
||||||
Mois=df["mois"],
|
Mois=df["mois"],
|
||||||
Catégorie=df["Catégorie Charge"],
|
Catégorie=df["Catégorie Charge"],
|
||||||
@ -81,7 +75,7 @@ def trans_2022_loc(
|
|||||||
Porte=df["lot"],
|
Porte=df["lot"],
|
||||||
Débit=0,
|
Débit=0,
|
||||||
Crédit=df["Réglés"].fillna(0),
|
Crédit=df["Réglés"].fillna(0),
|
||||||
Lot=df["lot"].apply(lot_naming),
|
Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
|
||||||
Année=df["annee"],
|
Année=df["annee"],
|
||||||
Mois=df["mois"],
|
Mois=df["mois"],
|
||||||
Catégorie="Loyer Charge",
|
Catégorie="Loyer Charge",
|
||||||
@ -99,7 +93,7 @@ def trans_2023(
|
|||||||
df = df.assign(
|
df = df.assign(
|
||||||
Débit=df["Débit"].fillna(0),
|
Débit=df["Débit"].fillna(0),
|
||||||
Crédit=df["Crédit"].fillna(0),
|
Crédit=df["Crédit"].fillna(0),
|
||||||
Lot=lot_naming(df["Porte"]),
|
Lot=df["Immeuble"].astype(str) + df["Porte"].astype("str").str.zfill(2),
|
||||||
Année=year,
|
Année=year,
|
||||||
)
|
)
|
||||||
return df[stagging_columns]
|
return df[stagging_columns]
|
||||||
@ -119,7 +113,7 @@ STAGGING_COLUMNS = [
|
|||||||
"Crédit",
|
"Crédit",
|
||||||
]
|
]
|
||||||
|
|
||||||
FLUXES_CRG = {
|
FLUXES = {
|
||||||
"2017 2021 - charge et locataire.xlsx": Flux(
|
"2017 2021 - charge et locataire.xlsx": Flux(
|
||||||
sources=[
|
sources=[
|
||||||
ExcelSource(
|
ExcelSource(
|
||||||
@ -185,7 +179,7 @@ if __name__ == "__main__":
|
|||||||
assert staging_crg_path.exists()
|
assert staging_crg_path.exists()
|
||||||
|
|
||||||
crg_files = consume_fluxes(
|
crg_files = consume_fluxes(
|
||||||
fluxes=FLUXES_CRG,
|
fluxes=FLUXES,
|
||||||
origin_path=history_crg_path,
|
origin_path=history_crg_path,
|
||||||
dest_path=staging_crg_path,
|
dest_path=staging_crg_path,
|
||||||
)
|
)
|
||||||
|
@ -14,7 +14,6 @@ def feature_crg(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
|||||||
df = dfs[0]
|
df = dfs[0]
|
||||||
df = df.assign(
|
df = df.assign(
|
||||||
Impact=df["Crédit"] - df["Débit"],
|
Impact=df["Crédit"] - df["Débit"],
|
||||||
Lot=df["Immeuble"].astype(str) + df["Lot"].astype("str"),
|
|
||||||
)
|
)
|
||||||
return df
|
return df
|
||||||
|
|
||||||
@ -48,12 +47,6 @@ def build_crg_fluxes(
|
|||||||
return fluxes
|
return fluxes
|
||||||
|
|
||||||
|
|
||||||
def FLUXES_CRG(staging_crg_path: Path):
|
|
||||||
return build_crg_fluxes(
|
|
||||||
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
data_path = Path("datas/")
|
data_path = Path("datas/")
|
||||||
assert data_path.exists()
|
assert data_path.exists()
|
||||||
@ -68,9 +61,10 @@ if __name__ == "__main__":
|
|||||||
gold_crg_path = gold_path / "CRG"
|
gold_crg_path = gold_path / "CRG"
|
||||||
assert gold_crg_path.exists()
|
assert gold_crg_path.exists()
|
||||||
|
|
||||||
|
fluxes = build_crg_fluxes(
|
||||||
|
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
|
||||||
|
)
|
||||||
crg_files = consume_fluxes(
|
crg_files = consume_fluxes(
|
||||||
fluxes=FLUXES_CRG(staging_crg_path),
|
fluxes=fluxes, origin_path=staging_crg_path, dest_path=gold_crg_path
|
||||||
origin_path=staging_crg_path,
|
|
||||||
dest_path=gold_crg_path,
|
|
||||||
)
|
)
|
||||||
print(crg_files)
|
print(crg_files)
|
||||||
|
Loading…
Reference in New Issue
Block a user