Refact: flux have 3 components source, tranformation, destination

This commit is contained in:
Bertrand Benjamin 2024-03-03 08:05:25 +01:00
parent fcff40adb7
commit dd0d8af40c
4 changed files with 65 additions and 38 deletions

View File

@ -36,16 +36,43 @@ class CSVSource(Source):
return pd.read_csv(filepath, **self.options) return pd.read_csv(filepath, **self.options)
class Transformation(BaseModel):
function: Callable
extra_kwrds: dict = {}
class Destination(BaseModel): class Destination(BaseModel):
name: str name: str
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
) -> list[Path]:
dest_basename = dest_path / self.name
return [writing_func(df, dest_basename)]
class SplitDestination(Destination):
split_column: str
def write(
self, df: pd.DataFrame, dest_path: Path, writing_func: Callable
) -> list[Path]:
wrote_files = []
for col_value in df[self.split_column].unique():
filtered_df = df[df[self.split_column] == col_value]
dest_basename = dest_path / f"{self.name}-{col_value}"
dest = writing_func(filtered_df, dest_basename)
wrote_files.append(dest)
return wrote_files
class Flux(BaseModel): class Flux(BaseModel):
sources: list[Source] sources: list[Source]
transformation: Callable transformation: Transformation
extra_kwrds: dict = {}
destination: Destination destination: Destination
split_destination: str = ""
def to_csv(df, dest_basename: Path) -> Path: def to_csv(df, dest_basename: Path) -> Path:
@ -112,18 +139,8 @@ def consume_fluxes(
df, duplicated = split_duplicates(df, str(filename), duplicated) df, duplicated = split_duplicates(df, str(filename), duplicated)
src_df.append(df) src_df.append(df)
df = flux.transformation(src_df, **flux.extra_kwrds) df = flux.transformation.function(src_df, **flux.transformation.extra_kwrds)
if flux.split_destination: files = flux.destination.write(df, dest_path, writing_func)
files = write_split_by(
df=df,
column=flux.split_destination,
dest_path=dest_path,
name=flux.destination.name,
writing_func=writing_func,
)
else:
dest_basename = dest_path / flux.destination.name
files = [writing_func(df, dest_basename)]
wrote_files += files wrote_files += files
return wrote_files return wrote_files

View File

@ -4,7 +4,14 @@ from pathlib import Path
import pandas as pd import pandas as pd
from scripts.flux import CSVSource, Destination, Flux, consume_fluxes from scripts.flux import (
CSVSource,
Destination,
Flux,
SplitDestination,
Transformation,
consume_fluxes,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -54,9 +61,8 @@ if __name__ == "__main__":
lot_fluxes = { lot_fluxes = {
"Lots": Flux( "Lots": Flux(
sources=[CSVSource(filename="CRG/crg-*.csv")], sources=[CSVSource(filename="CRG/crg-*.csv")],
transformation=build_lots, transformation=Transformation(function=build_lots),
destination=Destination(name="Lot/lot"), destination=SplitDestination(name="Lot/lot", split_column="Lot"),
split_destination="Lot",
), ),
} }
years = list(range(2017, 2024)) years = list(range(2017, 2024))

View File

@ -5,7 +5,7 @@ import pandas as pd
from scripts.flux import consume_fluxes from scripts.flux import consume_fluxes
from .flux import Destination, ExcelSource, Flux from .flux import Destination, ExcelSource, Flux, SplitDestination, Transformation
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -124,28 +124,31 @@ FLUXES = {
sheet_name="Catégories", sheet_name="Catégories",
), ),
], ],
transformation=trans_2017_2021, transformation=Transformation(
extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, function=trans_2017_2021,
destination=Destination(name="crg"), extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
split_destination="Année", ),
destination=SplitDestination(name="crg", split_column="Année"),
), ),
"2022 - charge.xlsx": Flux( "2022 - charge.xlsx": Flux(
sources=[ sources=[
ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"), ExcelSource(filename="2022 - charge.xlsx", sheet_name="Sheet1"),
], ],
transformation=trans_2022_charge, transformation=Transformation(
extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, function=trans_2022_charge,
destination=Destination(name="crg"), extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
split_destination="Année", ),
destination=SplitDestination(name="crg", split_column="Année"),
), ),
"2022 - locataire.xlsx": Flux( "2022 - locataire.xlsx": Flux(
sources=[ sources=[
ExcelSource(filename="2022 - locataire.xlsx", sheet_name="Sheet1"), ExcelSource(filename="2022 - locataire.xlsx", sheet_name="Sheet1"),
], ],
transformation=trans_2022_loc, transformation=Transformation(
extra_kwrds={"stagging_columns": STAGGING_COLUMNS}, function=trans_2022_loc,
destination=Destination(name="crg"), extra_kwrds={"stagging_columns": STAGGING_COLUMNS},
split_destination="Année", ),
destination=SplitDestination(name="crg", split_column="Année"),
), ),
"2023 - charge et locataire.xlsx": Flux( "2023 - charge et locataire.xlsx": Flux(
sources=[ sources=[
@ -154,10 +157,11 @@ FLUXES = {
sheet_name="DB CRG 2023 ...", sheet_name="DB CRG 2023 ...",
), ),
], ],
transformation=trans_2023, transformation=Transformation(
extra_kwrds={"year": 2023, "stagging_columns": STAGGING_COLUMNS}, function=trans_2023,
destination=Destination(name="crg"), extra_kwrds={"year": 2023, "stagging_columns": STAGGING_COLUMNS},
split_destination="Année", ),
destination=SplitDestination(name="crg", split_column="Année"),
), ),
} }

View File

@ -4,7 +4,7 @@ from pathlib import Path
import pandas as pd import pandas as pd
from scripts.flux import CSVSource, Destination, Flux, consume_fluxes from scripts.flux import CSVSource, Destination, Flux, Transformation, consume_fluxes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
@ -41,7 +41,7 @@ def build_crg_fluxes(
for file in crg_path.glob(pattern): for file in crg_path.glob(pattern):
fluxes[f"CRG - {file.name}"] = Flux( fluxes[f"CRG - {file.name}"] = Flux(
sources=[CSVSource(filename=file.name, options=csv_options)], sources=[CSVSource(filename=file.name, options=csv_options)],
transformation=transformation, transformation=Transformation(function=transformation),
destination=Destination(name=file.name), destination=Destination(name=file.name),
) )
return fluxes return fluxes