Feat: use fsm to extract lines from pdf

This commit is contained in:
Bertrand Benjamin 2025-02-26 05:54:44 +01:00
parent 6e0ffe9085
commit ce8cdc4c1e
6 changed files with 1085 additions and 781 deletions

File diff suppressed because one or more lines are too long

View File

@ -1,10 +1,11 @@
import logging
from datetime import datetime
from pathlib import Path
import pandas as pd
import pdfplumber
from pdf_oralia.pages import charge, locataire, patrimoine, recapitulatif
from pdf_oralia.pages import charge, locataire, patrimoine
extract_table_settings = {
"vertical_strategy": "lines",
@ -32,21 +33,16 @@ def extract_building(page_text, buildings=["bloch", "marietton", "servient"]):
raise ValueError("Pas d'immeuble trouvé")
def catch_malformed_table(tables):
if len(tables) == 2:
return tables[0] + tables[1]
return tables[0]
def pdf_extract_tables_lines(pdf):
loc_sink = locataire.fsm()
next(loc_sink)
charge_sink = charge.fsm()
next(charge_sink)
patrimoine_sink = patrimoine.fsm()
next(patrimoine_sink)
def from_pdf(pdf_file):
"""Build dataframes one about charges and another on loc"""
pdf = pdfplumber.open(pdf_file)
recapitulatif_tables = []
loc_tables = []
charge_tables = []
patrimoie_tables = []
for page_number, page in enumerate(pdf.pages):
page_number = 1
for page in pdf.pages:
page_text = page.extract_text()
date = extract_date(page_text)
additionnal_fields = {
@ -55,34 +51,50 @@ def from_pdf(pdf_file):
"annee": date.strftime("%Y"),
}
if recapitulatif.is_it(page_text):
table = page.extract_tables()[0]
extracted = recapitulatif.extract(table, additionnal_fields)
if extracted:
recapitulatif_tables.append(extracted)
elif locataire.is_it(page_text):
tables = page.extract_tables(extract_table_settings)[1:]
table = catch_malformed_table(tables)
extracted = locataire.extract(table, additionnal_fields)
loc_tables.append(extracted)
for line in page.extract_table(extract_table_settings):
if locataire.is_it(page_text):
res = loc_sink.send(line)
if res:
res.update(additionnal_fields)
yield locataire.Line(**res)
elif charge.is_it(page_text):
tables = page.extract_tables(extract_table_settings)[1:]
table = catch_malformed_table(tables)
extracted = charge.extract(table, additionnal_fields)
charge_tables.append(extracted)
res = charge_sink.send(line)
if res:
res.update(additionnal_fields)
yield charge.Line(**res)
elif patrimoine.is_it(page_text):
pass
res = patrimoine_sink.send(line)
if res:
res.update(additionnal_fields)
yield patrimoine.Line(**res)
else:
logging.warning(f"Page {page_number} non reconnu. Page ignorée.")
page_number += 1
def from_pdf(pdf_file):
"""Build dataframes one about charges and another on loc"""
pdf = pdfplumber.open(pdf_file)
locataire_lines = []
charge_lines = []
patrimoine_lines = []
for line in pdf_extract_tables_lines(pdf):
if isinstance(line, locataire.Line):
locataire_lines.append(line)
elif isinstance(line, charge.Line):
charge_lines.append(line)
elif isinstance(line, patrimoine.Line):
patrimoine_lines.append(line)
else:
logging.warning(f"Page {page_number+1} non reconnu. Page ignorée.")
df_charge = charge.table2df(recapitulatif_tables + charge_tables)
df_loc = locataire.table2df(loc_tables)
return df_charge, df_loc
return (
pd.DataFrame([c.__dict__ for c in charge_lines]),
pd.DataFrame([c.__dict__ for c in locataire_lines]),
pd.DataFrame([c.__dict__ for c in patrimoine_lines]),
)
def extract_save(pdf_file, dest):
@ -90,10 +102,13 @@ def extract_save(pdf_file, dest):
pdf_file = Path(pdf_file)
xls_charge = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_charge.xlsx"
xls_locataire = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx"
xls_patrimoine = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_patrimoine.xlsx"
df_charge, df_loc = from_pdf(pdf_file)
df_charge, df_loc, df_patrimoine = from_pdf(pdf_file)
df_charge.to_excel(xls_charge, sheet_name="Charges", index=False)
logging.info(f"{xls_charge} saved")
df_loc.to_excel(xls_locataire, sheet_name="Location", index=False)
logging.info(f"{xls_locataire} saved")
df_patrimoine.to_excel(xls_patrimoine, sheet_name="Patrimoine", index=False)
logging.info(f"{xls_patrimoine} saved")

View File

@ -1,9 +1,16 @@
import re
from pydantic import BaseModel, field_validator
import numpy as np
import pandas as pd
RECAPITULATIF_DES_OPERATIONS = 1
HEADER_CHARGE = [
"",
"RECAPITULATIF DES OPERATIONS",
"Débits",
"Crédits",
"Dont T.V.A.",
"Locatif",
"Déductible",
]
DF_TYPES = {
"Fournisseur": str,
"RECAPITULATIF DES OPERATIONS": str,
@ -17,7 +24,30 @@ DF_TYPES = {
"annee": str,
"lot": str,
}
DEFAULT_FOURNISSEUR = "ROSIER MODICA MOTTEROZ SA"
class Line(BaseModel):
mois: int
annee: int
immeuble: str
lot: str
Champs: str
Categorie: str
Fournisseur: str
Libellé: str
Débit: float
Crédits: float
Dont_TVA: float
Locatif: float
Déductible: float
@field_validator(
"Débit", "Crédits", "Dont_TVA", "Locatif", "Déductible", mode="before"
)
def set_default_if_empty(cls, v):
if v == "":
return 0
return v
def is_it(page_text):
@ -41,51 +71,54 @@ def get_lot(txt):
return "*"
def keep_row(row):
return not any(
[
word.lower() in row[RECAPITULATIF_DES_OPERATIONS].lower()
for word in ["TOTAL", "TOTAUX", "Solde créditeur", "Solde débiteur"]
]
def fsm():
current_state = "total"
row = {}
line = yield
while True:
if line == HEADER_CHARGE:
line = yield
if current_state == "total":
if line[1].lower().split(" ")[0] in ["total", "totaux"]:
current_state = "new_champs"
line = yield
elif current_state == "new_champs":
if line[0] != "":
current_state = "new_cat_line"
row = {"Champs": line[0], "Categorie": "", "Fournisseur": ""}
line = yield
elif current_state == "new_cat_line":
if line[1].lower().split(" ")[0] in ["total", "totaux"]:
current_state = "new_champs"
line = yield
row = {}
elif line[2] != "" or line[3] != "":
row.update(
{
"Fournisseur": line[0] if line[0] != "" else row["Fournisseur"],
"Libellé": line[1],
"lot": get_lot(line[1]),
"Débit": line[2],
"Crédits": line[3],
"Dont_TVA": line[4],
"Locatif": line[5],
"Déductible": line[6],
}
)
def extract(table, additionnal_fields: dict = {}):
"""Turn table to dictionary with additional fields"""
extracted = []
header = table[0]
for row in table[1:]:
if keep_row(row):
r = dict()
for i, value in enumerate(row):
if header[i] == "":
r["Fournisseur"] = value
line = yield row
row = {
"Champs": row["Champs"],
"Categorie": row["Categorie"],
"Fournisseur": row["Fournisseur"],
}
elif line[0] != "" and line[1] == "":
row.update({"Categorie": line[0]})
line = yield
elif line[1] != "":
row.update({"Categorie": line[1]})
line = yield
elif line[0] != "":
row.update({"Fournisseur": line[0]})
line = yield
else:
r[header[i]] = value
for k, v in additionnal_fields.items():
r[k] = v
if "honoraire" in row[RECAPITULATIF_DES_OPERATIONS].lower():
r["Fournisseur"] = DEFAULT_FOURNISSEUR
extracted.append(r)
return extracted
def table2df(tables):
dfs = []
for table in tables:
df = (
pd.DataFrame.from_records(table)
.replace("", np.nan)
.dropna(subset=["Débits", "Crédits"], how="all")
)
df["Fournisseur"] = df["Fournisseur"].fillna(method="ffill")
dfs.append(df)
df = pd.concat(dfs)
df["immeuble"] = df["immeuble"].apply(lambda x: x[0].capitalize())
df["lot"] = df["RECAPITULATIF DES OPERATIONS"].apply(get_lot)
return df.astype(DF_TYPES)
line = yield

View File

@ -1,22 +1,48 @@
import numpy as np
import pandas as pd
from pydantic import BaseModel, field_validator
DF_TYPES = {
"Locataires": str,
"Période": str,
"Loyers": float,
"Taxes": float,
"Provisions": float,
"Divers": str,
"Total": float,
"Réglés": float,
"Impayés": float,
"immeuble": str,
"mois": str,
"annee": str,
"Lot": str,
"Type": str,
}
HEADER_LOC = [
"Locataires",
"Période",
"Loyers",
"Taxes",
"Provisions",
"Divers",
"",
"Total",
"Réglés",
"Impayés",
]
class Line(BaseModel):
mois: int
annee: int
immeuble: str
Lot: str
Type: str
Locataire: str
Loyers: float
Taxes: float
Provisions: float
Divers: float
Total: float
Réglés: float
Impayés: float
@field_validator(
"Loyers",
"Taxes",
"Provisions",
"Divers",
"Total",
"Réglés",
"Impayés",
mode="before",
)
def set_default_if_empty(cls, v):
if v == "":
return 0
return v
def is_it(page_text):
@ -25,142 +51,43 @@ def is_it(page_text):
return False
def is_drop(row):
if "totaux" in row[0].lower():
return True
if not any(row):
return True
return False
def extract(table, additionnal_fields: dict = {}):
"""Turn table to dictionary with additional fields"""
extracted = []
header = table[0]
for row in table[1:]:
if not is_drop(row):
r = dict()
for i, value in enumerate(row):
if header[i] != "":
r[header[i]] = value
for k, v in additionnal_fields.items():
r[k] = v
extracted.append(r)
return extracted
def join_row(last, next):
row = {}
for key in last:
if last[key] == next[key]:
row[key] = last[key]
elif last[key] and next[key]:
row[key] = f"{last[key]}\n{next[key]}"
elif last[key]:
row[key] = last[key]
elif next[key]:
row[key] = next[key]
else:
row[key] = ""
return row
def join_tables(tables):
joined = tables[0]
for t in tables[1:]:
last_row = joined[-1]
if "totaux" not in last_row["Locataires"].lower():
first_row = t[0]
joined_row = join_row(last_row, first_row)
joined = joined[:-1] + [joined_row] + t[1:]
else:
joined += t
return joined
def parse_lot(string):
words = string.split(" ")
return {"Lot": "{:02d}".format(int(words[1])), "Type": " ".join(words[2:])}
def clean_type(string):
if "appartement" in string.lower():
return string[-2:]
return string
def join_row(table):
joined = []
for row in table:
if row["Locataires"].startswith("Lot"):
row.update(parse_lot(row["Locataires"]))
row["Locataires"] = ""
joined.append(row)
elif row["Locataires"] == "Rappel de Loyer":
last_row = joined[-1]
def fsm():
current_state = "new_row"
row = {}
line = yield
while True:
if line == HEADER_LOC:
line = yield
elif current_state == "new_row":
if line[0] != "" and line[0] != "TOTAUX":
row.update(parse_lot(line[0]))
current_state = "add_loc"
line = yield
elif current_state == "add_loc":
if line[0] != "":
row["Locataire"] = line[0]
current_state = "add_totaux"
line = yield
elif current_state == "add_totaux":
if line[0] == "Totaux":
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
"Divers": "Rappel de Loyer",
"Loyers": line[2],
"Taxes": line[3],
"Provisions": line[4],
"Divers": line[5],
"Total": line[7],
"Réglés": line[8],
"Impayés": line[9],
}
)
joined.append(row)
elif row["Locataires"]:
last_row = joined.pop()
row_name = row["Locataires"].replace("\n", " ")
row.update({k: v for k, v in last_row.items() if v})
row["Locataires"] = last_row["Locataires"] + " " + row_name
joined.append(row)
line = yield row
row = {}
current_state = "new_row"
else:
if row["Période"].startswith("Solde"):
last_row = joined.pop()
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
}
)
joined.append(row)
elif row["Période"].startswith("Du"):
last_row = joined[-1]
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
}
)
joined.append(row)
return joined
def flat_tables(tables):
tables_flat = []
for table in tables:
tables_flat.extend(table)
return tables_flat
def table2df(tables):
tables = flat_tables(tables)
joined = join_row(tables)
df = pd.DataFrame.from_records(joined)
df["immeuble"] = df["immeuble"].apply(lambda x: x[0].capitalize())
df["Type"] = df["Type"].apply(clean_type)
numeric_cols = [k for k, v in DF_TYPES.items() if v == float]
df[numeric_cols] = df[numeric_cols].replace("", np.nan)
df = df.drop(df[(df["Locataires"] == "") & (df["Période"] == "")].index)
return df.astype(DF_TYPES)
line = yield

View File

@ -1,4 +1,74 @@
from pydantic import BaseModel, field_validator
HEADER_PATRIMOINE = [
"Etage",
"Lots",
"Type de lot",
"Nom du Locataire",
"Loyer Annuel",
"Début Bail",
"Fin Bail",
"Entrée",
"Départ",
"Révisé le",
"U",
"Dépôt Gar.",
]
class Line(BaseModel):
mois: int
annee: int
immeuble: str
Etage: str
Lot: str
Type: str
Locataire: str
Loyer_annuel: int
Debut_bail: str
Fin_bail: str
Entree: str
Depart: str
Revision_bail: str
Usage: str
Depot_garantie: float
@field_validator("Loyer_annuel", "Depot_garantie", mode="before")
def set_default_if_empty(cls, v):
if v == "":
return 0
return v
def is_it(page_text):
if "VOTRE PATRIMOINE" in page_text:
return True
return False
def fsm():
current_state = "new_line"
row = {}
line = yield
while True:
if line == HEADER_PATRIMOINE:
line = yield
if current_state == "new_line":
if line[0] != "":
row = {
"Etage": line[0],
"Lot": line[1][-2:] if line[1] != "" else row["Lot"],
"Type": line[2] if line[2] != "" else row["Type"],
"Locataire": line[3],
"Loyer_annuel": line[4].replace(" ", ""),
"Debut_bail": line[5],
"Fin_bail": line[6],
"Entree": line[7],
"Depart": line[8],
"Revision_bail": line[9],
"Usage": line[10],
"Depot_garantie": line[11].replace(" ", ""),
}
line = yield row
else:
line = yield

View File

@ -1,3 +1,5 @@
pdfplumber
numpy
pandas
click
openpyxl