Compare commits
10 Commits
2f77206b8f
...
dd0d8af40c
Author | SHA1 | Date | |
---|---|---|---|
dd0d8af40c | |||
fcff40adb7 | |||
dec284bde1 | |||
d0961b0909 | |||
25ede1789a | |||
9e5541a770 | |||
bd866dda36 | |||
f56edac92c | |||
3916915e22 | |||
b62ea3f5ae |
45
Makefile
45
Makefile
@ -1,10 +1,49 @@
|
|||||||
|
DATA_BASE=./datas
|
||||||
|
|
||||||
|
PDF_BASE=$(DATA_BASE)/pdfs
|
||||||
|
PDF_YEARS=$(wildcard $(PDF_BASE)/*)
|
||||||
|
|
||||||
|
RAW_BASE=$(DATA_BASE)/raw
|
||||||
|
RAW_CRG=$(RAW_BASE)/CRG
|
||||||
|
RAW_CRG_YEARS=$(subst $(PDF_BASE), $(RAW_CRG), $(PDF_YEARS))
|
||||||
|
|
||||||
|
|
||||||
|
$(RAW_CRG)/%/: $(wildcard $(PDF_BASE)/%/*)
|
||||||
|
echo $(wildcard $(PDF_BASE)/$*/*)
|
||||||
|
@echo ----
|
||||||
|
ls $(PDF_BASE)/$*/
|
||||||
|
@echo ----
|
||||||
|
echo $*
|
||||||
|
@echo ----
|
||||||
|
echo $^
|
||||||
|
@echo ----
|
||||||
|
echo $?
|
||||||
|
|
||||||
|
#./datas/raw/CRG/%:
|
||||||
|
#pdf-oralia extract all --src $$year --dest $$(subst $$PDF_BASE, $$RAW_CRG, $$year)
|
||||||
|
# $(RAW_CRG_YEARS): $(PDF_PATHS)
|
||||||
|
# for year in $(PDF_PATHS); do \
|
||||||
|
# echo $$year; \
|
||||||
|
# echo $$(subst $$PDF_BASE, $$RAW_CRG, $$year); \
|
||||||
|
# echo "----"; \
|
||||||
|
# done;
|
||||||
|
|
||||||
|
extract_pdfs:
|
||||||
|
for year in 2021 2022 2023 2024; do \
|
||||||
|
mkdir -p $(RAW_CRG)/$$year/extracted;\
|
||||||
|
pdf-oralia extract all --src $(PDF_BASE)/$$year/ --dest $(RAW_CRG)/$$year/extracted; \
|
||||||
|
pdf-oralia join --src $(RAW_CRG)/$$year/extracted/ --dest $(RAW_CRG)/$$year/; \
|
||||||
|
done
|
||||||
|
|
||||||
clean_raw:
|
clean_raw:
|
||||||
rm -rf ./PLESNA Compta SYSTEM/raw/**/*.csv
|
rm -rf ./PLESNA Compta SYSTEM/raw/**/*.csv
|
||||||
|
|
||||||
clean_built:
|
clean_built:
|
||||||
rm -rf ./PLESNA Compta SYSTEM/staging/**/*.csv
|
rm -rf $(DATA_BASE)/staging/**/*.csv
|
||||||
rm -rf ./PLESNA Compta SYSTEM/gold/**/*.csv
|
rm -rf $(DATA_BASE)/gold/**/*.csv
|
||||||
rm -rf ./PLESNA Compta SYSTEM/datamart/**/*.csv
|
rm -rf $(DATA_BASE)/datamart/**/*.csv
|
||||||
|
|
||||||
clean_all: clean_built clean_raw
|
clean_all: clean_built clean_raw
|
||||||
|
|
||||||
|
import_nextcloud:
|
||||||
|
rsync -a ~/Nextcloud/PLESNA\ Compta\ SYSTEM/Histoire/ ./datas/Histoire
|
||||||
|
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
jupyter==1.0.0
|
||||||
|
pandas==1.5.0
|
||||||
|
pdf-oralia==0.3.11
|
||||||
|
pydantic==2.6.1
|
146
scripts/flux.py
Normal file
146
scripts/flux.py
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
import logging
|
||||||
|
from abc import abstractmethod
|
||||||
|
from collections.abc import Callable
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
class Source(BaseModel):
|
||||||
|
filename: str
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_df(self) -> pd.DataFrame:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class ExcelSource(Source):
|
||||||
|
sheet_name: str
|
||||||
|
|
||||||
|
def get_df(self, base_path: Path) -> pd.DataFrame:
|
||||||
|
filepath = base_path / self.filename
|
||||||
|
logger.debug(f"Get content of {filepath}")
|
||||||
|
return pd.read_excel(filepath, sheet_name=self.sheet_name)
|
||||||
|
|
||||||
|
|
||||||
|
class CSVSource(Source):
|
||||||
|
options: dict = {}
|
||||||
|
|
||||||
|
def get_df(self, base_path: Path) -> pd.DataFrame:
|
||||||
|
filepath = base_path / self.filename
|
||||||
|
logger.debug(f"Get content of {filepath}")
|
||||||
|
return pd.read_csv(filepath, **self.options)
|
||||||
|
|
||||||
|
|
||||||
|
class Transformation(BaseModel):
|
||||||
|
function: Callable
|
||||||
|
extra_kwrds: dict = {}
|
||||||
|
|
||||||
|
|
||||||
|
class Destination(BaseModel):
|
||||||
|
name: str
|
||||||
|
|
||||||
|
def write(
|
||||||
|
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
||||||
|
) -> list[Path]:
|
||||||
|
dest_basename = dest_path / self.name
|
||||||
|
return [writing_func(df, dest_basename)]
|
||||||
|
|
||||||
|
|
||||||
|
class SplitDestination(Destination):
|
||||||
|
split_column: str
|
||||||
|
|
||||||
|
def write(
|
||||||
|
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
|
||||||
|
) -> list[Path]:
|
||||||
|
wrote_files = []
|
||||||
|
|
||||||
|
for col_value in df[self.split_column].unique():
|
||||||
|
filtered_df = df[df[self.split_column] == col_value]
|
||||||
|
|
||||||
|
dest_basename = dest_path / f"{self.name}-{col_value}"
|
||||||
|
dest = writing_func(filtered_df, dest_basename)
|
||||||
|
wrote_files.append(dest)
|
||||||
|
|
||||||
|
return wrote_files
|
||||||
|
|
||||||
|
|
||||||
|
class Flux(BaseModel):
|
||||||
|
sources: list[Source]
|
||||||
|
transformation: Transformation
|
||||||
|
destination: Destination
|
||||||
|
|
||||||
|
|
||||||
|
def to_csv(df, dest_basename: Path) -> Path:
|
||||||
|
dest = dest_basename.parent / (dest_basename.stem + ".csv")
|
||||||
|
if dest.exists():
|
||||||
|
df.to_csv(dest, mode="a", header=False, index=False)
|
||||||
|
else:
|
||||||
|
df.to_csv(dest, index=False)
|
||||||
|
return dest
|
||||||
|
|
||||||
|
|
||||||
|
def write_split_by(
|
||||||
|
df: pd.DataFrame, column: str, dest_path: Path, name: str, writing_func
|
||||||
|
) -> list[Path]:
|
||||||
|
wrote_files = []
|
||||||
|
|
||||||
|
for col_value in df[column].unique():
|
||||||
|
filtered_df = df[df[column] == col_value]
|
||||||
|
|
||||||
|
dest_basename = dest_path / f"{name}-{col_value}"
|
||||||
|
dest = writing_func(filtered_df, dest_basename)
|
||||||
|
wrote_files.append(dest)
|
||||||
|
|
||||||
|
return wrote_files
|
||||||
|
|
||||||
|
|
||||||
|
def extract_sources(sources: list[Source], base_path: Path = Path()):
|
||||||
|
for src in sources:
|
||||||
|
if "*" in src.filename:
|
||||||
|
expanded_src = [
|
||||||
|
src.model_copy(update={"filename": str(p.relative_to(base_path))})
|
||||||
|
for p in base_path.glob(src.filename)
|
||||||
|
]
|
||||||
|
yield from extract_sources(expanded_src, base_path)
|
||||||
|
else:
|
||||||
|
filepath = base_path / src.filename
|
||||||
|
assert filepath.exists
|
||||||
|
yield src.filename, src.get_df(base_path)
|
||||||
|
|
||||||
|
|
||||||
|
def split_duplicates(
|
||||||
|
df, origin: str, duplicated: dict[str, pd.DataFrame]
|
||||||
|
) -> [pd.DataFrame, dict[str, pd.DataFrame]]:
|
||||||
|
duplicates = df.duplicated()
|
||||||
|
no_duplicates = df[~duplicates]
|
||||||
|
duplicated[origin] = df[duplicates]
|
||||||
|
return no_duplicates, duplicated
|
||||||
|
|
||||||
|
|
||||||
|
def consume_fluxes(
|
||||||
|
fluxes: dict[str, Flux],
|
||||||
|
origin_path: Path,
|
||||||
|
dest_path: Path,
|
||||||
|
writing_func=to_csv,
|
||||||
|
):
|
||||||
|
duplicated = {}
|
||||||
|
wrote_files = []
|
||||||
|
|
||||||
|
for name, flux in fluxes.items():
|
||||||
|
print(name)
|
||||||
|
logger.info(f"Processing flux {name}")
|
||||||
|
src_df = []
|
||||||
|
for filename, df in extract_sources(flux.sources, origin_path):
|
||||||
|
df, duplicated = split_duplicates(df, str(filename), duplicated)
|
||||||
|
src_df.append(df)
|
||||||
|
|
||||||
|
df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
|
||||||
|
|
||||||
|
files = flux.destination.write(df, dest_path, writing_func)
|
||||||
|
wrote_files += files
|
||||||
|
return wrote_files
|
75
scripts/gold_mart.py
Normal file
75
scripts/gold_mart.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
import logging
|
||||||
|
from collections.abc import Callable
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from scripts.flux import (
|
||||||
|
CSVSource,
|
||||||
|
Destination,
|
||||||
|
Flux,
|
||||||
|
SplitDestination,
|
||||||
|
Transformation,
|
||||||
|
consume_fluxes,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def build_lots(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
||||||
|
df = dfs[0]
|
||||||
|
df = df.assign(
|
||||||
|
Impact=df["Crédit"] - df["Débit"],
|
||||||
|
)
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
def build_pnl(dfs: list[pd.DataFrame], year: int) -> pd.DataFrame:
|
||||||
|
df = pd.concat(dfs)
|
||||||
|
df = df[df["Année"] == year]
|
||||||
|
pt = df.groupby(["Catégorie", "Mois"]).agg("sum").unstack().fillna(0)
|
||||||
|
pt.columns = [c[1] for c in pt.columns]
|
||||||
|
pt.reset_index(["Catégorie"])
|
||||||
|
return pt
|
||||||
|
|
||||||
|
|
||||||
|
def build_pnl_flux(year: int) -> Flux:
|
||||||
|
return Flux(
|
||||||
|
sources=[
|
||||||
|
CSVSource(filename=f"CRG/crg-{year}.csv"),
|
||||||
|
CSVSource(filename=f"banque/banque-{year}.csv"),
|
||||||
|
],
|
||||||
|
transformation=build_pnl,
|
||||||
|
extra_kwrds={"year": year},
|
||||||
|
destination=Destination(name=f"pnl/{year}"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
data_path = Path("datas/")
|
||||||
|
assert data_path.exists()
|
||||||
|
|
||||||
|
gold_path = data_path / "gold"
|
||||||
|
assert gold_path.exists()
|
||||||
|
gold_crg_path = gold_path / "CRG"
|
||||||
|
assert gold_crg_path.exists()
|
||||||
|
|
||||||
|
mart_path = data_path / "datamart"
|
||||||
|
assert mart_path.exists()
|
||||||
|
|
||||||
|
lot_fluxes = {
|
||||||
|
"Lots": Flux(
|
||||||
|
sources=[CSVSource(filename="CRG/crg-*.csv")],
|
||||||
|
transformation=Transformation(function=build_lots),
|
||||||
|
destination=SplitDestination(name="Lot/lot", split_column="Lot"),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
years = list(range(2017, 2024))
|
||||||
|
# pnl_fluxes = {f"pnl-{year}": build_pnl_flux(year) for year in years}
|
||||||
|
pnl_fluxes = {}
|
||||||
|
|
||||||
|
files = consume_fluxes(
|
||||||
|
fluxes={**lot_fluxes, **pnl_fluxes}, origin_path=gold_path, dest_path=mart_path
|
||||||
|
)
|
||||||
|
print(files)
|
186
scripts/history_stagging.py
Normal file
186
scripts/history_stagging.py
Normal file
@ -0,0 +1,186 @@
|
|||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from scripts.flux import consume_fluxes
|
||||||
|
|
||||||
|
from .flux import Destination, ExcelSource, Flux, SplitDestination, Transformation
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_cat(cat: pd.DataFrame):
|
||||||
|
cat_drop = list(cat[cat["Nouvelles"] == "NE PAS IMPORTER"]["Anciennes"])
|
||||||
|
cat_trans = cat[cat["Nouvelles"] != "NE PAS IMPORTER"]
|
||||||
|
|
||||||
|
trans = {}
|
||||||
|
for _, (old, new) in cat_trans.iterrows():
|
||||||
|
trans[old] = new
|
||||||
|
|
||||||
|
return trans, cat_drop
|
||||||
|
|
||||||
|
|
||||||
|
def trans_2017_2021(
|
||||||
|
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
df, cat = dfs
|
||||||
|
cat_trans, cat_drop = extract_cat(cat)
|
||||||
|
|
||||||
|
df = df[~df["Categorie"].isin(cat_drop)]
|
||||||
|
|
||||||
|
df = df.assign(
|
||||||
|
Immeuble=df["immeuble"],
|
||||||
|
Porte=df["porte"],
|
||||||
|
Débit=df["Débit"].fillna(0),
|
||||||
|
Crédit=df["Crédit"].fillna(0),
|
||||||
|
Lot=df["immeuble"].astype(str) + df["porte"].astype("str").str.zfill(2),
|
||||||
|
Année=df["Date"].astype(str).str.slice(0, 4),
|
||||||
|
Mois=df["Date"].astype(str).str.slice(5, 7),
|
||||||
|
Catégorie=df["Categorie"].replace(cat_trans),
|
||||||
|
Fournisseur="",
|
||||||
|
)
|
||||||
|
|
||||||
|
return df[stagging_columns]
|
||||||
|
|
||||||
|
|
||||||
|
def trans_2022_charge(
|
||||||
|
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
df = dfs[0]
|
||||||
|
df = df.assign(
|
||||||
|
Immeuble=df["immeuble"],
|
||||||
|
Porte=df["lot"],
|
||||||
|
Débit=df["Débits"].fillna(0),
|
||||||
|
Crédit=df["Crédits"].fillna(0),
|
||||||
|
Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
|
||||||
|
Année=df["annee"],
|
||||||
|
Mois=df["mois"],
|
||||||
|
Catégorie=df["Catégorie Charge"],
|
||||||
|
# Catégorie=df["Catégorie Charge"].replace(trans),
|
||||||
|
Fournisseur="",
|
||||||
|
Régie="Oralia - Gelas",
|
||||||
|
Libellé="",
|
||||||
|
)
|
||||||
|
return df[stagging_columns]
|
||||||
|
|
||||||
|
|
||||||
|
def trans_2022_loc(
|
||||||
|
dfs: list[pd.DataFrame], stagging_columns: list[str], **kwrds
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
df = dfs[0]
|
||||||
|
df = df.assign(
|
||||||
|
Immeuble=df["immeuble"],
|
||||||
|
Porte=df["lot"],
|
||||||
|
Débit=0,
|
||||||
|
Crédit=df["Réglés"].fillna(0),
|
||||||
|
Lot=df["immeuble"].astype(str)[0] + df["lot"].astype("str").str.zfill(2),
|
||||||
|
Année=df["annee"],
|
||||||
|
Mois=df["mois"],
|
||||||
|
Catégorie="Loyer Charge",
|
||||||
|
Fournisseur="",
|
||||||
|
Régie="Oralia - Gelas",
|
||||||
|
Libellé="",
|
||||||
|
)
|
||||||
|
return df[stagging_columns]
|
||||||
|
|
||||||
|
|
||||||
|
def trans_2023(
|
||||||
|
dfs: list[pd.DataFrame], year: str, stagging_columns: list[str], **kwrds
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
df = dfs[0]
|
||||||
|
df = df.assign(
|
||||||
|
Débit=df["Débit"].fillna(0),
|
||||||
|
Crédit=df["Crédit"].fillna(0),
|
||||||
|
Lot=df["Immeuble"].astype(str) + df["Porte"].astype("str").str.zfill(2),
|
||||||
|
Année=year,
|
||||||
|
)
|
||||||
|
return df[stagging_columns]
|
||||||
|
|
||||||
|
|
||||||
|
STAGGING_COLUMNS = [
|
||||||
|
"Régie",
|
||||||
|
"Immeuble",
|
||||||
|
"Porte",
|
||||||
|
"Lot",
|
||||||
|
"Année",
|
||||||
|
"Mois",
|
||||||
|
"Catégorie",
|
||||||
|
"Fournisseur",
|
||||||
|
"Libellé",
|
||||||
|
"Débit",
|
||||||
|
"Crédit",
|
||||||
|
]
|
||||||
|
|
||||||
|
FLUXES = {
|
||||||
|
"2017 2021 - charge et locataire.xlsx": Flux(
|
||||||
|
sources=[
|
||||||
|
ExcelSource(
|
||||||
|
filename="2017 2021 - charge et locataire.xlsx", sheet_name="DB CRG"
|
||||||
|
),
|
||||||
|
ExcelSource(
|
||||||
|
filename="2017 2021 - charge et locataire.xlsx",
|
||||||
|
sheet_name="Catégories",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
transformation=Transformation(
|
||||||
|
function=trans_2017_2021,
|
||||||
|
extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
|
||||||
|
),
|
||||||
|
destination=SplitDestination(name="crg", split_column="Année"),
|
||||||
|
),
|
||||||
|
"2022 - charge.xlsx": Flux(
|
||||||
|
sources=[
|
||||||
|
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
|
||||||
|
],
|
||||||
|
transformation=Transformation(
|
||||||
|
function=trans_2022_charge,
|
||||||
|
extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
|
||||||
|
),
|
||||||
|
destination=SplitDestination(name="crg", split_column="Année"),
|
||||||
|
),
|
||||||
|
"2022 - locataire.xlsx": Flux(
|
||||||
|
sources=[
|
||||||
|
ExcelSource(filename="2022 - locataire.xlsx", sheet_name="Sheet1"),
|
||||||
|
],
|
||||||
|
transformation=Transformation(
|
||||||
|
function=trans_2022_loc,
|
||||||
|
extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
|
||||||
|
),
|
||||||
|
destination=SplitDestination(name="crg", split_column="Année"),
|
||||||
|
),
|
||||||
|
"2023 - charge et locataire.xlsx": Flux(
|
||||||
|
sources=[
|
||||||
|
ExcelSource(
|
||||||
|
filename="2023 - charge et locataire.xlsx",
|
||||||
|
sheet_name="DB CRG 2023 ...",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
transformation=Transformation(
|
||||||
|
function=trans_2023,
|
||||||
|
extra_kwrds={"year": 2023, "stagging_columns": STAGGING_COLUMNS},
|
||||||
|
),
|
||||||
|
destination=SplitDestination(name="crg", split_column="Année"),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
data_path = Path("datas/")
|
||||||
|
assert data_path.exists()
|
||||||
|
history_path = data_path / "Histoire"
|
||||||
|
assert history_path.exists()
|
||||||
|
history_crg_path = history_path / "CRG"
|
||||||
|
assert history_crg_path.exists()
|
||||||
|
|
||||||
|
staging_path = data_path / "staging"
|
||||||
|
assert staging_path.exists()
|
||||||
|
staging_crg_path = staging_path / "CRG"
|
||||||
|
assert staging_crg_path.exists()
|
||||||
|
|
||||||
|
crg_files = consume_fluxes(
|
||||||
|
fluxes=FLUXES,
|
||||||
|
origin_path=history_crg_path,
|
||||||
|
dest_path=staging_crg_path,
|
||||||
|
)
|
||||||
|
print(crg_files)
|
@ -4,7 +4,7 @@ import pandas as pd
|
|||||||
from pydantic import BaseModel, ValidationError
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
|
|
||||||
class Interseptor:
|
class ValidationInterseptor:
|
||||||
def __init__(self, model: BaseModel):
|
def __init__(self, model: BaseModel):
|
||||||
self.model = model
|
self.model = model
|
||||||
self.not_valid_rows = []
|
self.not_valid_rows = []
|
||||||
@ -18,8 +18,10 @@ class Interseptor:
|
|||||||
try:
|
try:
|
||||||
self.model(**r)
|
self.model(**r)
|
||||||
except ValidationError:
|
except ValidationError:
|
||||||
r["InterseptorOrigin"] = func.__name__
|
r["ValidationInterseptorFunc"] = func.__name__
|
||||||
r["InterseptorIndex"] = i
|
r["ValidationInterseptorArgs"] = args
|
||||||
|
r["ValidationInterseptorKwrds"] = kwrds
|
||||||
|
r["ValidationInterseptorIndex"] = i
|
||||||
self.not_valid_rows.append(r)
|
self.not_valid_rows.append(r)
|
||||||
else:
|
else:
|
||||||
valid_rows.append(r)
|
valid_rows.append(r)
|
||||||
|
70
scripts/stagging_gold.py
Normal file
70
scripts/stagging_gold.py
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
import logging
|
||||||
|
from collections.abc import Callable
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from scripts.flux import CSVSource, Destination, Flux, Transformation, consume_fluxes
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def feature_crg(dfs: list[pd.DataFrame]) -> pd.DataFrame:
|
||||||
|
df = dfs[0]
|
||||||
|
df = df.assign(
|
||||||
|
Impact=df["Crédit"] - df["Débit"],
|
||||||
|
)
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
GOLD_COLUMNS = [
|
||||||
|
"Régie",
|
||||||
|
"Immeuble",
|
||||||
|
"Porte",
|
||||||
|
"Lot",
|
||||||
|
"Année",
|
||||||
|
"Mois",
|
||||||
|
"Catégorie",
|
||||||
|
"Fournisseur",
|
||||||
|
"Libellé",
|
||||||
|
"Débit",
|
||||||
|
"Crédit",
|
||||||
|
"Impact",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def build_crg_fluxes(
|
||||||
|
crg_path: Path, pattern: str, transformation: Callable, csv_options: dict = {}
|
||||||
|
) -> dict[str, Flux]:
|
||||||
|
fluxes = {}
|
||||||
|
for file in crg_path.glob(pattern):
|
||||||
|
fluxes[f"CRG - {file.name}"] = Flux(
|
||||||
|
sources=[CSVSource(filename=file.name, options=csv_options)],
|
||||||
|
transformation=Transformation(function=transformation),
|
||||||
|
destination=Destination(name=file.name),
|
||||||
|
)
|
||||||
|
return fluxes
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
data_path = Path("datas/")
|
||||||
|
assert data_path.exists()
|
||||||
|
|
||||||
|
staging_path = data_path / "staging"
|
||||||
|
assert staging_path.exists()
|
||||||
|
staging_crg_path = staging_path / "CRG"
|
||||||
|
assert staging_crg_path.exists()
|
||||||
|
|
||||||
|
gold_path = data_path / "gold"
|
||||||
|
assert gold_path.exists()
|
||||||
|
gold_crg_path = gold_path / "CRG"
|
||||||
|
assert gold_crg_path.exists()
|
||||||
|
|
||||||
|
fluxes = build_crg_fluxes(
|
||||||
|
crg_path=staging_crg_path, pattern="*.csv", transformation=feature_crg
|
||||||
|
)
|
||||||
|
crg_files = consume_fluxes(
|
||||||
|
fluxes=fluxes, origin_path=staging_crg_path, dest_path=gold_crg_path
|
||||||
|
)
|
||||||
|
print(crg_files)
|
@ -4,7 +4,7 @@ import pandas as pd
|
|||||||
import pytest
|
import pytest
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from scripts.intersept_not_valid import Interseptor
|
from scripts.intersept_not_valid import ValidationInterseptor
|
||||||
|
|
||||||
|
|
||||||
class FakeModel(BaseModel):
|
class FakeModel(BaseModel):
|
||||||
@ -13,7 +13,7 @@ class FakeModel(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
def test_init_composed():
|
def test_init_composed():
|
||||||
interceptor = Interseptor(FakeModel)
|
interceptor = ValidationInterseptor(FakeModel)
|
||||||
|
|
||||||
def df_generator(nrows=3):
|
def df_generator(nrows=3):
|
||||||
records = [{"name": "plop", "age": random.randint(1, 50)} for _ in range(nrows)]
|
records = [{"name": "plop", "age": random.randint(1, 50)} for _ in range(nrows)]
|
||||||
@ -27,7 +27,7 @@ def test_init_composed():
|
|||||||
|
|
||||||
|
|
||||||
def test_init_decorator():
|
def test_init_decorator():
|
||||||
interceptor = Interseptor(FakeModel)
|
interceptor = ValidationInterseptor(FakeModel)
|
||||||
|
|
||||||
@interceptor
|
@interceptor
|
||||||
def df_generator(nrows=3):
|
def df_generator(nrows=3):
|
||||||
@ -40,7 +40,7 @@ def test_init_decorator():
|
|||||||
|
|
||||||
|
|
||||||
def test_intersept_not_valid():
|
def test_intersept_not_valid():
|
||||||
interceptor = Interseptor(FakeModel)
|
interceptor = ValidationInterseptor(FakeModel)
|
||||||
|
|
||||||
@interceptor
|
@interceptor
|
||||||
def df_generator():
|
def df_generator():
|
||||||
@ -57,7 +57,9 @@ def test_intersept_not_valid():
|
|||||||
{
|
{
|
||||||
"name": "hop",
|
"name": "hop",
|
||||||
"age": "ui",
|
"age": "ui",
|
||||||
"InterseptorOrigin": "df_generator",
|
"ValidationInterseptorFunc": "df_generator",
|
||||||
"InterseptorIndex": 1,
|
"ValidationInterseptorArgs": (),
|
||||||
|
"ValidationInterseptorKwrds": {},
|
||||||
|
"ValidationInterseptorIndex": 1,
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
Loading…
Reference in New Issue
Block a user