import logging from logging.config import dictConfig from pathlib import Path import click from .flux import consume_fluxes DATA_PATH = Path("datas/") assert DATA_PATH.exists() HISTORY_PATH = DATA_PATH / "Histoire" assert HISTORY_PATH.exists() STAGING_PATH = DATA_PATH / "staging" assert STAGING_PATH.exists() GOLD_PATH = DATA_PATH / "gold" assert GOLD_PATH.exists() MART_PATH = DATA_PATH / "datamart" assert MART_PATH.exists() @click.group() @click.option("--debug/--no-debug", default=False) def main(debug): if debug: logging_level = logging.DEBUG else: logging_level = logging.INFO logging_config = dict( version=1, formatters={"f": {"format": "%(levelname)-8s %(name)-12s %(message)s"}}, handlers={ "h": { "class": "logging.StreamHandler", "formatter": "f", "level": logging_level, } }, root={ "handlers": ["h"], "level": logging_level, }, ) dictConfig(logging_config) @main.command() def ingest(): from .history_stagging import FLUXES_CRG history_crg_path = HISTORY_PATH / "CRG" assert history_crg_path.exists() staging_crg_path = STAGING_PATH / "CRG" assert staging_crg_path.exists() consume_fluxes( fluxes=FLUXES_CRG, origin_path=history_crg_path, dest_path=staging_crg_path, ) @main.command() def feature(): from .stagging_gold import FLUXES_CRG staging_crg_path = STAGING_PATH / "CRG" assert staging_crg_path.exists() gold_crg_path = GOLD_PATH / "CRG" assert gold_crg_path.exists() consume_fluxes( fluxes=FLUXES_CRG(staging_crg_path), origin_path=staging_crg_path, dest_path=gold_crg_path, ) @main.command() def datamart(): from .gold_mart import FLUXES_LOT consume_fluxes(fluxes=FLUXES_LOT, origin_path=GOLD_PATH, dest_path=MART_PATH) if __name__ == "__main__": main()