From 8a55e6e2cc9d1ba99d713036bcb0e75bd37e7a40 Mon Sep 17 00:00:00 2001 From: Bertrand Benjamin Date: Fri, 16 Jun 2023 08:32:36 +0200 Subject: [PATCH] Feat: marche avec les pdfs tous ensembles --- pdf_oralia/extract.py | 95 ++++++++++++--------- pdf_oralia/extract_charge.py | 68 --------------- pdf_oralia/extract_locataire.py | 81 ------------------ pdf_oralia/join.py | 30 ------- pdf_oralia/pages/__init__.py | 1 + pdf_oralia/pages/charge.py | 72 ++++++++++++++++ pdf_oralia/pages/locataire.py | 134 ++++++++++++++++++++++++++++++ pdf_oralia/pages/patrimoine.py | 4 + pdf_oralia/pages/recapitulatif.py | 34 ++++++++ 9 files changed, 303 insertions(+), 216 deletions(-) delete mode 100644 pdf_oralia/extract_charge.py delete mode 100644 pdf_oralia/extract_locataire.py delete mode 100644 pdf_oralia/join.py create mode 100644 pdf_oralia/pages/__init__.py create mode 100644 pdf_oralia/pages/charge.py create mode 100644 pdf_oralia/pages/locataire.py create mode 100644 pdf_oralia/pages/patrimoine.py create mode 100644 pdf_oralia/pages/recapitulatif.py diff --git a/pdf_oralia/extract.py b/pdf_oralia/extract.py index 98ba3a0..3407f56 100644 --- a/pdf_oralia/extract.py +++ b/pdf_oralia/extract.py @@ -2,13 +2,11 @@ import logging from datetime import datetime from pathlib import Path -import pandas as pd import pdfplumber -from .extract_charge import extract_charge, extract_remise_com -from .extract_locataire import extract_situation_loc +from pdf_oralia.pages import charge, locataire, patrimoine, recapitulatif -charge_table_settings = { +extract_table_settings = { "vertical_strategy": "lines", "horizontal_strategy": "text", } @@ -27,45 +25,63 @@ def extract_date(page_text): return datetime.strptime(words[-1], "%d/%m/%Y") -def extract_from_pdf(pdf, charge_dest, location_dest): - """Build charge_dest and location_dest xlsx file from pdf""" +def extract_building(page_text, buildings=["bloch", "marietton", "servient"]): + for building in buildings: + if building in page_text.lower(): + return building + raise ValueError("Pas d'immeuble trouvé") + + +def catch_malformed_table(tables): + if len(tables) == 2: + return tables[0] + tables[1] + return tables[0] + + +def from_pdf(pdf): + """Build dataframes one about charges and another on loc""" + recapitulatif_tables = [] loc_tables = [] - charge_table = [] + charge_tables = [] + patrimoie_tables = [] - df_1st_charge = extract_remise_com( - pdf.pages[0].extract_table(charge_table_settings) - ) - - for page in pdf.pages[1:]: + for page in pdf.pages: page_text = page.extract_text() - situation_loc_line = [ - l for l in page_text.split("\n") if "SITUATION DES LOCATAIRES" in l - ] date = extract_date(page_text) - mois = date.strftime("%m") - annee = date.strftime("%Y") - if situation_loc_line: - # mois, annee = situation_loc_line[0].split(" ")[-2:] - if loc_tables: - loc_tables.append(page.extract_table()[1:]) - else: - loc_tables.append(page.extract_table()) + additionnal_fields = { + "immeuble": extract_building(page_text), + "mois": date.strftime("%m"), + "annee": date.strftime("%Y"), + } - elif "RECAPITULATIF DES OPERATIONS" in page_text: - if charge_table: - charge_table += page.extract_table(charge_table_settings)[1:] - else: - charge_table = page.extract_table(charge_table_settings) + if recapitulatif.is_it(page_text): + table = page.extract_tables()[0] + extracted = recapitulatif.extract(table, additionnal_fields) + if extracted: + recapitulatif_tables.append(extracted) - df_charge = extract_charge(charge_table) - df_charge_with_1st = pd.concat([df_1st_charge, df_charge]) - df_charge_with_1st.to_excel(charge_dest, sheet_name="Charges", index=False) - logging.info(f"{charge_dest} saved") + elif locataire.is_it(page_text): + tables = page.extract_tables(extract_table_settings)[1:] + table = catch_malformed_table(tables) + extracted = locataire.extract(table, additionnal_fields) + loc_tables.append(extracted) - df_loc = extract_situation_loc(loc_tables, mois=mois, annee=annee) - df_loc = df_loc.assign() - df_loc.to_excel(location_dest, sheet_name="Location", index=False) - logging.info(f"{location_dest} saved") + elif charge.is_it(page_text): + tables = page.extract_tables(extract_table_settings)[1:] + table = catch_malformed_table(tables) + extracted = charge.extract(table, additionnal_fields) + charge_tables.append(extracted) + + elif patrimoine.is_it(page_text): + pass + + else: + raise ValueError("Page non reconnu") + + df_charge = charge.table2df(recapitulatif_tables + charge_tables) + df_loc = locataire.table2df(loc_tables) + + return df_charge, df_loc def extract_save(pdf_file, dest): @@ -75,4 +91,9 @@ def extract_save(pdf_file, dest): xls_locataire = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx" pdf = pdfplumber.open(pdf_file) - extract_from_pdf(pdf, xls_charge, xls_locataire) + df_charge, df_loc = from_pdf(pdf) + + df_charge.to_excel(xls_charge, sheet_name="Charges", index=False) + logging.info(f"{xls_charge} saved") + df_loc.to_excel(xls_locataire, sheet_name="Location", index=False) + logging.info(f"{xls_locataire} saved") diff --git a/pdf_oralia/extract_charge.py b/pdf_oralia/extract_charge.py deleted file mode 100644 index 5f7150a..0000000 --- a/pdf_oralia/extract_charge.py +++ /dev/null @@ -1,68 +0,0 @@ -import logging - -import numpy as np -import pandas as pd - - -def get_lot(x): - """Return lot number from "RECAPITULATIF DES OPERATIONS" """ - if x[:2].isdigit(): - return x[:2] - if x[:1].isdigit(): - return "0" + x[:1] - if x[:2] == "PC": - return "PC" - return "" - - -def extract_charge(table): - """From pdfplumber table extract the charge dataframe""" - df = ( - pd.DataFrame(table[1:], columns=table[0]) - .replace("", np.nan) - .dropna(subset=["Débits", "Crédits"], how="all") - ) - - drop_index = df[ - df["RECAPITULATIF DES OPERATIONS"].str.contains("TOTAUX", case=False) - | df["RECAPITULATIF DES OPERATIONS"].str.contains("Solde créditeur", case=False) - | df["RECAPITULATIF DES OPERATIONS"].str.contains("Solde débiteur", case=False) - | df["RECAPITULATIF DES OPERATIONS"].str.contains( - "Total des reglements locataires", case=False - ) - ].index - df.drop(drop_index, inplace=True) - - df[""].mask( - df["RECAPITULATIF DES OPERATIONS"].str.contains("honoraires", case=False), - "IMI GERANCE", - inplace=True, - ) - - df = df.assign(lot=df["RECAPITULATIF DES OPERATIONS"].map(get_lot)) - - df = df.astype( - { - "Débits": "float64", - "Crédits": "float64", - "Dont T.V.A.": "float64", - "Locatif": "float64", - "Déductible": "float64", - } - ) - - df.columns.values[0] = "Fournisseur" - return df - - -def extract_remise_com(table): - """Extract "remise commercial" from first page""" - df = pd.DataFrame(table[1:], columns=table[0]).replace("", np.nan) - df = df[ - df["RECAPITULATIF DES OPERATIONS"].str.contains( - "Remise commerciale gérance", case=False, na=False - ) - ] - - df.columns.values[0] = "Fournisseur" - return df diff --git a/pdf_oralia/extract_locataire.py b/pdf_oralia/extract_locataire.py deleted file mode 100644 index d47f27f..0000000 --- a/pdf_oralia/extract_locataire.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging - -import pandas as pd - - -def parse_above_loc(content): - row = {} - app, loc, *_ = content.split("\n") - app_ = app.split(" ") - row["lot"] = f"{int(app_[1]):02d}" - row["type"] = " ".join(app_[2:]) - row["locataire"] = loc - return pd.Series(row) - - -def join_row(last, next): - row = [] - for i in range(len(last)): - if last[i] and next[i]: - row.append(f"{last[i]}\n{next[i]}") - elif last[i]: - row.append(last[i]) - elif next[i]: - row.append(next[i]) - else: - row.append("") - return row - - -def join_tables(tables): - - joined = tables[0] - - for t in tables[1:]: - last_row = joined[-1] - if "Totaux" not in last_row[0]: - first_row = t[0] - joined_row = join_row(last_row, first_row) - joined = joined[:-1] + [joined_row] + t[1:] - else: - joined += t - - return joined - - -def extract_situation_loc(tables, mois, annee): - """From pdfplumber table extract locataire df""" - table = join_tables(tables) - try: - df = pd.DataFrame(table[1:], columns=table[0]) - except IndexError: - print(table) - rows = [] - for i, row in df[df["Locataires"] == "Totaux"].iterrows(): - above_row_loc = df.iloc[i - 1]["Locataires"] - up_row = pd.concat( - [ - row, - parse_above_loc(above_row_loc), - ] - ) - - rows.append(up_row) - df_cleaned = pd.concat(rows, axis=1).T - df_cleaned.drop(["Locataires", "", "Période"], axis=1, inplace=True) - - df_cleaned = df_cleaned.astype( - { - "Loyers": "float64", - "Taxes": "float64", - "Provisions": "float64", - "Divers": "float64", - "Total": "float64", - "Réglés": "float64", - "Impayés": "float64", - }, - errors="ignore", - ) - - df_cleaned = df_cleaned.assign(mois=mois, annee=annee) - return df_cleaned diff --git a/pdf_oralia/join.py b/pdf_oralia/join.py deleted file mode 100644 index f4d87b8..0000000 --- a/pdf_oralia/join.py +++ /dev/null @@ -1,30 +0,0 @@ -import logging -from pathlib import Path - -import pandas as pd - - -def extract_excel_to_dfs(directory, df_names=["charge", "locataire"]): - p = Path(directory) - dfs = {name: [] for name in df_names} - - for file in p.glob("*.xlsx"): - year, month, immeuble, table = file.stem.split("_") - df = pd.read_excel(file, dtype={"lot": str}).assign( - annee=year, mois=month, immeuble=immeuble[:3] - ) - dfs[table].append(df) - - return dfs - - -def join_excel(directory, dest, df_names=["charge", "locataire"]): - dfs = extract_excel_to_dfs(directory, df_names) - destinations = {} - for tablename, datas in dfs.items(): - df = pd.concat(datas) - destination = Path(dest) / f"{tablename}.xlsx" - df.to_excel(destination, index=False) - destinations[tablename] = destination - logging.info(f"{destination} written") - return destinations diff --git a/pdf_oralia/pages/__init__.py b/pdf_oralia/pages/__init__.py new file mode 100644 index 0000000..61bd7b3 --- /dev/null +++ b/pdf_oralia/pages/__init__.py @@ -0,0 +1 @@ +from . import charge, locataire, patrimoine, recapitulatif diff --git a/pdf_oralia/pages/charge.py b/pdf_oralia/pages/charge.py new file mode 100644 index 0000000..a6b6abf --- /dev/null +++ b/pdf_oralia/pages/charge.py @@ -0,0 +1,72 @@ +import numpy as np +import pandas as pd + +RECAPITULATIF_DES_OPERATION = 1 + + +def is_it(page_text): + if ( + "RECAPITULATIF DES OPERATIONS" in page_text + and "COMPTE RENDU DE GESTION" not in page_text + ): + return True + return False + + +def get_lot(x): + """Return lot number from "RECAPITULATIF DES OPERATIONS" """ + if x[:2].isdigit(): + return x[:2] + if x[:1].isdigit(): + return "0" + x[:1] + if x[:2] == "PC": + return "PC" + return "" + + +def keep_row(row): + return not any( + [ + word.lower() in row[RECAPITULATIF_DES_OPERATION].lower() + for word in ["TOTAL", "TOTAUX", "Solde créditeur", "Solde débiteur"] + ] + ) + + +def extract(table, additionnal_fields: dict = {}): + """Turn table to dictionary with additionnal fields""" + extracted = [] + header = table[0] + for row in table[1:]: + if keep_row(row): + r = dict() + for i, value in enumerate(row): + if header[i] == "": + r["Fournisseur"] = value + else: + r[header[i]] = value + + for k, v in additionnal_fields.items(): + r[k] = v + + r["lot"] = get_lot(row[RECAPITULATIF_DES_OPERATION]) + + if "honoraire" in row[RECAPITULATIF_DES_OPERATION]: + r["Fournisseur"] = "IMI GERANCE" + + extracted.append(r) + + return extracted + + +def table2df(tables): + dfs = [] + for table in tables: + df = ( + pd.DataFrame.from_records(table) + .replace("", np.nan) + .dropna(subset=["Débits", "Crédits"], how="all") + ) + df["Fournisseur"] = df["Fournisseur"].fillna(method="ffill") + dfs.append(df) + return pd.concat(dfs) diff --git a/pdf_oralia/pages/locataire.py b/pdf_oralia/pages/locataire.py new file mode 100644 index 0000000..57bdd67 --- /dev/null +++ b/pdf_oralia/pages/locataire.py @@ -0,0 +1,134 @@ +import pandas as pd + + +def is_it(page_text): + if "SITUATION DES LOCATAIRES" in page_text: + return True + return False + + +def is_drop(row): + if "totaux" in row[0].lower(): + return True + if not any(row): + return True + return False + + +def extract(table, additionnal_fields: dict = {}): + """Turn table to dictionary with additionnal fields""" + extracted = [] + header = table[0] + for row in table[1:]: + if not is_drop(row): + r = dict() + for i, value in enumerate(row): + if header[i] != "": + r[header[i]] = value + for k, v in additionnal_fields.items(): + r[k] = v + extracted.append(r) + return extracted + + +def join_row(last, next): + row = {} + for key in last: + if last[key] == next[key]: + row[key] = last[key] + elif last[key] and next[key]: + row[key] = f"{last[key]}\n{next[key]}" + elif last[key]: + row[key] = last[key] + elif next[key]: + row[key] = next[key] + else: + row[key] = "" + return row + + +def join_tables(tables): + joined = tables[0] + + for t in tables[1:]: + last_row = joined[-1] + if "totaux" not in last_row["Locataires"].lower(): + first_row = t[0] + joined_row = join_row(last_row, first_row) + joined = joined[:-1] + [joined_row] + t[1:] + else: + joined += t + + return joined + + +def parse_lot(string): + words = string.split(" ") + return {"Lot": words[1], "Type": " ".join(words[2:])} + + +def join_row(table): + joined = [] + for row in table: + if row["Locataires"].startswith("Lot"): + row.update(parse_lot(row["Locataires"])) + row["Locataires"] = "" + joined.append(row) + elif row["Locataires"] == "Rappel de Loyer": + last_row = joined[-1] + row.update( + { + "Lot": last_row["Lot"], + "Type": last_row["Type"], + "Locataires": last_row["Locataires"], + "Divers": "Rappel de Loyer", + } + ) + joined.append(row) + + elif row["Locataires"]: + last_row = joined.pop() + row_name = row["Locataires"].replace("\n", " ") + row.update({k: v for k, v in last_row.items() if v}) + row["Locataires"] = last_row["Locataires"] + " " + row_name + joined.append(row) + + else: + if row["Période"].startswith("Solde"): + last_row = joined.pop() + row.update( + { + "Lot": last_row["Lot"], + "Type": last_row["Type"], + "Locataires": last_row["Locataires"], + } + ) + joined.append(row) + + elif row["Période"].startswith("Du"): + last_row = joined[-1] + row.update( + { + "Lot": last_row["Lot"], + "Type": last_row["Type"], + "Locataires": last_row["Locataires"], + } + ) + joined.append(row) + else: + print(row) + + return joined + + +def flat_tables(tables): + tables_flat = [] + for table in tables: + tables_flat.extend(table) + return tables_flat + + +def table2df(tables): + tables = flat_tables(tables) + joined = join_row(tables) + return pd.DataFrame.from_records(joined) diff --git a/pdf_oralia/pages/patrimoine.py b/pdf_oralia/pages/patrimoine.py new file mode 100644 index 0000000..3541ea7 --- /dev/null +++ b/pdf_oralia/pages/patrimoine.py @@ -0,0 +1,4 @@ +def is_it(page_text): + if "VOTRE PATRIMOINE" in page_text: + return True + return False diff --git a/pdf_oralia/pages/recapitulatif.py b/pdf_oralia/pages/recapitulatif.py new file mode 100644 index 0000000..783b590 --- /dev/null +++ b/pdf_oralia/pages/recapitulatif.py @@ -0,0 +1,34 @@ +import numpy as np +import pandas as pd + + +def is_it(page_text): + if "COMPTE RENDU DE GESTION" in page_text: + return True + return False + + +def extract(table, additionnal_fields: dict = {}): + """Extract "remise commercial" from first page""" + extracted = [] + header = table[0] + for row in table[1:]: + if "Remise commerciale gérance" in row: + r = dict() + for i, value in enumerate(row): + r[header[i]] = value + for k, v in additionnal_fields.items(): + r[k] = v + extracted.append(r) + + return extracted + + # df = pd.DataFrame(table[1:], columns=table[0]).replace("", np.nan) + # df = df[ + # df["RECAPITULATIF DES OPERATIONS"].str.contains( + # "Remise commerciale gérance", case=False, na=False + # ) + # ] + # + # df.columns.values[0] = "Fournisseur" + # return df