Feat: marche avec les pdfs tous ensembles

This commit is contained in:
Bertrand Benjamin 2023-06-16 08:32:36 +02:00
parent 1afb2a32ab
commit 8a55e6e2cc
9 changed files with 303 additions and 216 deletions

View File

@ -2,13 +2,11 @@ import logging
from datetime import datetime
from pathlib import Path
import pandas as pd
import pdfplumber
from .extract_charge import extract_charge, extract_remise_com
from .extract_locataire import extract_situation_loc
from pdf_oralia.pages import charge, locataire, patrimoine, recapitulatif
charge_table_settings = {
extract_table_settings = {
"vertical_strategy": "lines",
"horizontal_strategy": "text",
}
@ -27,45 +25,63 @@ def extract_date(page_text):
return datetime.strptime(words[-1], "%d/%m/%Y")
def extract_from_pdf(pdf, charge_dest, location_dest):
"""Build charge_dest and location_dest xlsx file from pdf"""
def extract_building(page_text, buildings=["bloch", "marietton", "servient"]):
for building in buildings:
if building in page_text.lower():
return building
raise ValueError("Pas d'immeuble trouvé")
def catch_malformed_table(tables):
if len(tables) == 2:
return tables[0] + tables[1]
return tables[0]
def from_pdf(pdf):
"""Build dataframes one about charges and another on loc"""
recapitulatif_tables = []
loc_tables = []
charge_table = []
charge_tables = []
patrimoie_tables = []
df_1st_charge = extract_remise_com(
pdf.pages[0].extract_table(charge_table_settings)
)
for page in pdf.pages[1:]:
for page in pdf.pages:
page_text = page.extract_text()
situation_loc_line = [
l for l in page_text.split("\n") if "SITUATION DES LOCATAIRES" in l
]
date = extract_date(page_text)
mois = date.strftime("%m")
annee = date.strftime("%Y")
if situation_loc_line:
# mois, annee = situation_loc_line[0].split(" ")[-2:]
if loc_tables:
loc_tables.append(page.extract_table()[1:])
else:
loc_tables.append(page.extract_table())
additionnal_fields = {
"immeuble": extract_building(page_text),
"mois": date.strftime("%m"),
"annee": date.strftime("%Y"),
}
elif "RECAPITULATIF DES OPERATIONS" in page_text:
if charge_table:
charge_table += page.extract_table(charge_table_settings)[1:]
else:
charge_table = page.extract_table(charge_table_settings)
if recapitulatif.is_it(page_text):
table = page.extract_tables()[0]
extracted = recapitulatif.extract(table, additionnal_fields)
if extracted:
recapitulatif_tables.append(extracted)
df_charge = extract_charge(charge_table)
df_charge_with_1st = pd.concat([df_1st_charge, df_charge])
df_charge_with_1st.to_excel(charge_dest, sheet_name="Charges", index=False)
logging.info(f"{charge_dest} saved")
elif locataire.is_it(page_text):
tables = page.extract_tables(extract_table_settings)[1:]
table = catch_malformed_table(tables)
extracted = locataire.extract(table, additionnal_fields)
loc_tables.append(extracted)
df_loc = extract_situation_loc(loc_tables, mois=mois, annee=annee)
df_loc = df_loc.assign()
df_loc.to_excel(location_dest, sheet_name="Location", index=False)
logging.info(f"{location_dest} saved")
elif charge.is_it(page_text):
tables = page.extract_tables(extract_table_settings)[1:]
table = catch_malformed_table(tables)
extracted = charge.extract(table, additionnal_fields)
charge_tables.append(extracted)
elif patrimoine.is_it(page_text):
pass
else:
raise ValueError("Page non reconnu")
df_charge = charge.table2df(recapitulatif_tables + charge_tables)
df_loc = locataire.table2df(loc_tables)
return df_charge, df_loc
def extract_save(pdf_file, dest):
@ -75,4 +91,9 @@ def extract_save(pdf_file, dest):
xls_locataire = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx"
pdf = pdfplumber.open(pdf_file)
extract_from_pdf(pdf, xls_charge, xls_locataire)
df_charge, df_loc = from_pdf(pdf)
df_charge.to_excel(xls_charge, sheet_name="Charges", index=False)
logging.info(f"{xls_charge} saved")
df_loc.to_excel(xls_locataire, sheet_name="Location", index=False)
logging.info(f"{xls_locataire} saved")

View File

@ -1,68 +0,0 @@
import logging
import numpy as np
import pandas as pd
def get_lot(x):
"""Return lot number from "RECAPITULATIF DES OPERATIONS" """
if x[:2].isdigit():
return x[:2]
if x[:1].isdigit():
return "0" + x[:1]
if x[:2] == "PC":
return "PC"
return ""
def extract_charge(table):
"""From pdfplumber table extract the charge dataframe"""
df = (
pd.DataFrame(table[1:], columns=table[0])
.replace("", np.nan)
.dropna(subset=["Débits", "Crédits"], how="all")
)
drop_index = df[
df["RECAPITULATIF DES OPERATIONS"].str.contains("TOTAUX", case=False)
| df["RECAPITULATIF DES OPERATIONS"].str.contains("Solde créditeur", case=False)
| df["RECAPITULATIF DES OPERATIONS"].str.contains("Solde débiteur", case=False)
| df["RECAPITULATIF DES OPERATIONS"].str.contains(
"Total des reglements locataires", case=False
)
].index
df.drop(drop_index, inplace=True)
df[""].mask(
df["RECAPITULATIF DES OPERATIONS"].str.contains("honoraires", case=False),
"IMI GERANCE",
inplace=True,
)
df = df.assign(lot=df["RECAPITULATIF DES OPERATIONS"].map(get_lot))
df = df.astype(
{
"Débits": "float64",
"Crédits": "float64",
"Dont T.V.A.": "float64",
"Locatif": "float64",
"Déductible": "float64",
}
)
df.columns.values[0] = "Fournisseur"
return df
def extract_remise_com(table):
"""Extract "remise commercial" from first page"""
df = pd.DataFrame(table[1:], columns=table[0]).replace("", np.nan)
df = df[
df["RECAPITULATIF DES OPERATIONS"].str.contains(
"Remise commerciale gérance", case=False, na=False
)
]
df.columns.values[0] = "Fournisseur"
return df

View File

@ -1,81 +0,0 @@
import logging
import pandas as pd
def parse_above_loc(content):
row = {}
app, loc, *_ = content.split("\n")
app_ = app.split(" ")
row["lot"] = f"{int(app_[1]):02d}"
row["type"] = " ".join(app_[2:])
row["locataire"] = loc
return pd.Series(row)
def join_row(last, next):
row = []
for i in range(len(last)):
if last[i] and next[i]:
row.append(f"{last[i]}\n{next[i]}")
elif last[i]:
row.append(last[i])
elif next[i]:
row.append(next[i])
else:
row.append("")
return row
def join_tables(tables):
joined = tables[0]
for t in tables[1:]:
last_row = joined[-1]
if "Totaux" not in last_row[0]:
first_row = t[0]
joined_row = join_row(last_row, first_row)
joined = joined[:-1] + [joined_row] + t[1:]
else:
joined += t
return joined
def extract_situation_loc(tables, mois, annee):
"""From pdfplumber table extract locataire df"""
table = join_tables(tables)
try:
df = pd.DataFrame(table[1:], columns=table[0])
except IndexError:
print(table)
rows = []
for i, row in df[df["Locataires"] == "Totaux"].iterrows():
above_row_loc = df.iloc[i - 1]["Locataires"]
up_row = pd.concat(
[
row,
parse_above_loc(above_row_loc),
]
)
rows.append(up_row)
df_cleaned = pd.concat(rows, axis=1).T
df_cleaned.drop(["Locataires", "", "Période"], axis=1, inplace=True)
df_cleaned = df_cleaned.astype(
{
"Loyers": "float64",
"Taxes": "float64",
"Provisions": "float64",
"Divers": "float64",
"Total": "float64",
"Réglés": "float64",
"Impayés": "float64",
},
errors="ignore",
)
df_cleaned = df_cleaned.assign(mois=mois, annee=annee)
return df_cleaned

View File

@ -1,30 +0,0 @@
import logging
from pathlib import Path
import pandas as pd
def extract_excel_to_dfs(directory, df_names=["charge", "locataire"]):
p = Path(directory)
dfs = {name: [] for name in df_names}
for file in p.glob("*.xlsx"):
year, month, immeuble, table = file.stem.split("_")
df = pd.read_excel(file, dtype={"lot": str}).assign(
annee=year, mois=month, immeuble=immeuble[:3]
)
dfs[table].append(df)
return dfs
def join_excel(directory, dest, df_names=["charge", "locataire"]):
dfs = extract_excel_to_dfs(directory, df_names)
destinations = {}
for tablename, datas in dfs.items():
df = pd.concat(datas)
destination = Path(dest) / f"{tablename}.xlsx"
df.to_excel(destination, index=False)
destinations[tablename] = destination
logging.info(f"{destination} written")
return destinations

View File

@ -0,0 +1 @@
from . import charge, locataire, patrimoine, recapitulatif

View File

@ -0,0 +1,72 @@
import numpy as np
import pandas as pd
RECAPITULATIF_DES_OPERATION = 1
def is_it(page_text):
if (
"RECAPITULATIF DES OPERATIONS" in page_text
and "COMPTE RENDU DE GESTION" not in page_text
):
return True
return False
def get_lot(x):
"""Return lot number from "RECAPITULATIF DES OPERATIONS" """
if x[:2].isdigit():
return x[:2]
if x[:1].isdigit():
return "0" + x[:1]
if x[:2] == "PC":
return "PC"
return ""
def keep_row(row):
return not any(
[
word.lower() in row[RECAPITULATIF_DES_OPERATION].lower()
for word in ["TOTAL", "TOTAUX", "Solde créditeur", "Solde débiteur"]
]
)
def extract(table, additionnal_fields: dict = {}):
"""Turn table to dictionary with additionnal fields"""
extracted = []
header = table[0]
for row in table[1:]:
if keep_row(row):
r = dict()
for i, value in enumerate(row):
if header[i] == "":
r["Fournisseur"] = value
else:
r[header[i]] = value
for k, v in additionnal_fields.items():
r[k] = v
r["lot"] = get_lot(row[RECAPITULATIF_DES_OPERATION])
if "honoraire" in row[RECAPITULATIF_DES_OPERATION]:
r["Fournisseur"] = "IMI GERANCE"
extracted.append(r)
return extracted
def table2df(tables):
dfs = []
for table in tables:
df = (
pd.DataFrame.from_records(table)
.replace("", np.nan)
.dropna(subset=["Débits", "Crédits"], how="all")
)
df["Fournisseur"] = df["Fournisseur"].fillna(method="ffill")
dfs.append(df)
return pd.concat(dfs)

View File

@ -0,0 +1,134 @@
import pandas as pd
def is_it(page_text):
if "SITUATION DES LOCATAIRES" in page_text:
return True
return False
def is_drop(row):
if "totaux" in row[0].lower():
return True
if not any(row):
return True
return False
def extract(table, additionnal_fields: dict = {}):
"""Turn table to dictionary with additionnal fields"""
extracted = []
header = table[0]
for row in table[1:]:
if not is_drop(row):
r = dict()
for i, value in enumerate(row):
if header[i] != "":
r[header[i]] = value
for k, v in additionnal_fields.items():
r[k] = v
extracted.append(r)
return extracted
def join_row(last, next):
row = {}
for key in last:
if last[key] == next[key]:
row[key] = last[key]
elif last[key] and next[key]:
row[key] = f"{last[key]}\n{next[key]}"
elif last[key]:
row[key] = last[key]
elif next[key]:
row[key] = next[key]
else:
row[key] = ""
return row
def join_tables(tables):
joined = tables[0]
for t in tables[1:]:
last_row = joined[-1]
if "totaux" not in last_row["Locataires"].lower():
first_row = t[0]
joined_row = join_row(last_row, first_row)
joined = joined[:-1] + [joined_row] + t[1:]
else:
joined += t
return joined
def parse_lot(string):
words = string.split(" ")
return {"Lot": words[1], "Type": " ".join(words[2:])}
def join_row(table):
joined = []
for row in table:
if row["Locataires"].startswith("Lot"):
row.update(parse_lot(row["Locataires"]))
row["Locataires"] = ""
joined.append(row)
elif row["Locataires"] == "Rappel de Loyer":
last_row = joined[-1]
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
"Divers": "Rappel de Loyer",
}
)
joined.append(row)
elif row["Locataires"]:
last_row = joined.pop()
row_name = row["Locataires"].replace("\n", " ")
row.update({k: v for k, v in last_row.items() if v})
row["Locataires"] = last_row["Locataires"] + " " + row_name
joined.append(row)
else:
if row["Période"].startswith("Solde"):
last_row = joined.pop()
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
}
)
joined.append(row)
elif row["Période"].startswith("Du"):
last_row = joined[-1]
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
}
)
joined.append(row)
else:
print(row)
return joined
def flat_tables(tables):
tables_flat = []
for table in tables:
tables_flat.extend(table)
return tables_flat
def table2df(tables):
tables = flat_tables(tables)
joined = join_row(tables)
return pd.DataFrame.from_records(joined)

View File

@ -0,0 +1,4 @@
def is_it(page_text):
if "VOTRE PATRIMOINE" in page_text:
return True
return False

View File

@ -0,0 +1,34 @@
import numpy as np
import pandas as pd
def is_it(page_text):
if "COMPTE RENDU DE GESTION" in page_text:
return True
return False
def extract(table, additionnal_fields: dict = {}):
"""Extract "remise commercial" from first page"""
extracted = []
header = table[0]
for row in table[1:]:
if "Remise commerciale gérance" in row:
r = dict()
for i, value in enumerate(row):
r[header[i]] = value
for k, v in additionnal_fields.items():
r[k] = v
extracted.append(r)
return extracted
# df = pd.DataFrame(table[1:], columns=table[0]).replace("", np.nan)
# df = df[
# df["RECAPITULATIF DES OPERATIONS"].str.contains(
# "Remise commerciale gérance", case=False, na=False
# )
# ]
#
# df.columns.values[0] = "Fournisseur"
# return df