Compare commits

..

No commits in common. "rework_extract" and "main" have entirely different histories.

8 changed files with 811 additions and 1226 deletions

File diff suppressed because one or more lines are too long

View File

@ -1,23 +1,3 @@
# PDF AURALIA
Extraction de fichiers de comptabilité en pdf vers xlsx.
## Utilisation
- Lancement sur un fichier pdf particulier
```bash
pdf_oralia extract on <pdf_file> --dest <where to put producted files>
```
- Lancement sur tous les fichiers d'un repertoire (récursivement )
```bash
pdf_oralia extract all --src <source folder> --dest <destination folder>
```
Cette commande reproduira la structure du dossier source dans destination. Seul les fichiers non existants seront traités. Par default, les fichiers déjà produits ne seront pas écrasés.
On peut ajouter les options suivantes:
- `--force`: pour écraser les fichiers déjà traités
- `--only-plan`: pour voir quels fichiers pourraient être créé sans le faire.

View File

@ -1,11 +1,10 @@
import logging
from datetime import datetime
from pathlib import Path
import pandas as pd
import pdfplumber
from pdf_oralia.pages import charge, locataire, patrimoine
from pdf_oralia.pages import charge, locataire, patrimoine, recapitulatif
extract_table_settings = {
"vertical_strategy": "lines",
@ -13,10 +12,6 @@ extract_table_settings = {
}
class ExtractError(Exception):
pass
def extract_date(page_text):
"""Extract date from a page
@ -37,100 +32,68 @@ def extract_building(page_text, buildings=["bloch", "marietton", "servient"]):
raise ValueError("Pas d'immeuble trouvé")
def pdf_extract_tables_lines(pdf):
loc_sink = locataire.fsm()
next(loc_sink)
charge_sink = charge.fsm()
next(charge_sink)
patrimoine_sink = patrimoine.fsm()
next(patrimoine_sink)
for page_number, page in enumerate(pdf.pages):
page_text = page.extract_text()
date = extract_date(page_text)
try:
additionnal_fields = {
"immeuble": extract_building(page_text),
"mois": date.strftime("%m"),
"annee": date.strftime("%Y"),
}
except ValueError:
logging.warning(
f"L'immeuble de la page {page_number+1} non identifiable. Page ignorée."
)
continue
table_type = ""
if locataire.is_it(page_text):
table_type = "locataire"
elif charge.is_it(page_text):
table_type = "charge"
elif patrimoine.is_it(page_text):
table_type = "patrimoine"
else:
logging.warning(
f"Type de la page {page_number+1} non identifiable. Page ignorée."
)
continue
for line in page.extract_table(extract_table_settings):
if table_type == "locataire":
res = loc_sink.send(line)
if res:
res.update(additionnal_fields)
yield locataire.Line(**res)
elif table_type == "charge":
res = charge_sink.send(line)
if res:
res.update(additionnal_fields)
yield charge.Line(**res)
elif table_type == "patrimoine":
res = patrimoine_sink.send(line)
if res:
res.update(additionnal_fields)
yield patrimoine.Line(**res)
def catch_malformed_table(tables):
if len(tables) == 2:
return tables[0] + tables[1]
return tables[0]
def from_pdf(pdf_file):
"""Build dataframes one about charges and another on loc"""
pdf = pdfplumber.open(pdf_file)
locataire_lines = []
charge_lines = []
patrimoine_lines = []
for line in pdf_extract_tables_lines(pdf):
if isinstance(line, locataire.Line):
locataire_lines.append(line)
elif isinstance(line, charge.Line):
charge_lines.append(line)
elif isinstance(line, patrimoine.Line):
patrimoine_lines.append(line)
recapitulatif_tables = []
loc_tables = []
charge_tables = []
patrimoie_tables = []
for page_number, page in enumerate(pdf.pages):
page_text = page.extract_text()
date = extract_date(page_text)
additionnal_fields = {
"immeuble": extract_building(page_text),
"mois": date.strftime("%m"),
"annee": date.strftime("%Y"),
}
if recapitulatif.is_it(page_text):
table = page.extract_tables()[0]
extracted = recapitulatif.extract(table, additionnal_fields)
if extracted:
recapitulatif_tables.append(extracted)
elif locataire.is_it(page_text):
tables = page.extract_tables(extract_table_settings)[1:]
table = catch_malformed_table(tables)
extracted = locataire.extract(table, additionnal_fields)
loc_tables.append(extracted)
elif charge.is_it(page_text):
tables = page.extract_tables(extract_table_settings)[1:]
table = catch_malformed_table(tables)
extracted = charge.extract(table, additionnal_fields)
charge_tables.append(extracted)
elif patrimoine.is_it(page_text):
pass
else:
logging.warning(f"Page {page_number+1} non reconnu. Page ignorée.")
return {
"charge": pd.DataFrame([c.__dict__ for c in charge_lines]),
"locataire": pd.DataFrame([c.__dict__ for c in locataire_lines]),
"patrimoine": pd.DataFrame([c.__dict__ for c in patrimoine_lines]),
}
df_charge = charge.table2df(recapitulatif_tables + charge_tables)
df_loc = locataire.table2df(loc_tables)
return df_charge, df_loc
def extract_plan(pdf_file, dest):
return {
"charge": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_charge.xlsx",
"locataire": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx",
"patrimoine": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_patrimoine.xlsx",
}
def extract_save(pdf_file, dest, save=[]):
def extract_save(pdf_file, dest):
"""Extract charge and locataire for pdf_file and put xlsx file in dest"""
pdf_file = Path(pdf_file)
xlss = extract_plan(pdf_file, dest)
xls_charge = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_charge.xlsx"
xls_locataire = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx"
dfs = from_pdf(pdf_file)
df_charge, df_loc = from_pdf(pdf_file)
for s in save:
dfs[s].to_excel(xlss[s], sheet_name=s, index=False)
logging.info(f"{xlss[s]} saved")
return {k: v for k, v in xlss.items() if k in save}
df_charge.to_excel(xls_charge, sheet_name="Charges", index=False)
logging.info(f"{xls_charge} saved")
df_loc.to_excel(xls_locataire, sheet_name="Location", index=False)
logging.info(f"{xls_locataire} saved")

View File

@ -1,16 +1,9 @@
import re
from pydantic import BaseModel, field_validator
import numpy as np
import pandas as pd
HEADER_CHARGE = [
"",
"RECAPITULATIF DES OPERATIONS",
"Débits",
"Crédits",
"Dont T.V.A.",
"Locatif",
"Déductible",
]
RECAPITULATIF_DES_OPERATIONS = 1
DF_TYPES = {
"Fournisseur": str,
"RECAPITULATIF DES OPERATIONS": str,
@ -24,30 +17,7 @@ DF_TYPES = {
"annee": str,
"lot": str,
}
class Line(BaseModel):
mois: int
annee: int
immeuble: str
lot: str
Champs: str
Categorie: str
Fournisseur: str
Libellé: str
Débit: float
Crédits: float
Dont_TVA: float
Locatif: float
Déductible: float
@field_validator(
"Débit", "Crédits", "Dont_TVA", "Locatif", "Déductible", mode="before"
)
def set_default_if_empty(cls, v):
if v == "":
return 0
return v
DEFAULT_FOURNISSEUR = "ROSIER MODICA MOTTEROZ SA"
def is_it(page_text):
@ -71,54 +41,51 @@ def get_lot(txt):
return "*"
def fsm():
current_state = "total"
row = {}
line = yield
while True:
if line == HEADER_CHARGE:
line = yield
if current_state == "total":
if line[1].lower().split(" ")[0] in ["total", "totaux"]:
current_state = "new_champs"
line = yield
elif current_state == "new_champs":
if line[0] != "":
current_state = "new_cat_line"
row = {"Champs": line[0], "Categorie": "", "Fournisseur": ""}
line = yield
elif current_state == "new_cat_line":
if line[1].lower().split(" ")[0] in ["total", "totaux"]:
current_state = "new_champs"
line = yield
row = {}
elif line[2] != "" or line[3] != "":
row.update(
{
"Fournisseur": line[0] if line[0] != "" else row["Fournisseur"],
"Libellé": line[1],
"lot": get_lot(line[1]),
"Débit": line[2],
"Crédits": line[3],
"Dont_TVA": line[4],
"Locatif": line[5],
"Déductible": line[6],
}
)
line = yield row
row = {
"Champs": row["Champs"],
"Categorie": row["Categorie"],
"Fournisseur": row["Fournisseur"],
}
elif line[0] != "" and line[1] == "":
row.update({"Categorie": line[0]})
line = yield
elif line[1] != "":
row.update({"Categorie": line[1]})
line = yield
elif line[0] != "":
row.update({"Fournisseur": line[0]})
line = yield
else:
line = yield
def keep_row(row):
return not any(
[
word.lower() in row[RECAPITULATIF_DES_OPERATIONS].lower()
for word in ["TOTAL", "TOTAUX", "Solde créditeur", "Solde débiteur"]
]
)
def extract(table, additionnal_fields: dict = {}):
"""Turn table to dictionary with additional fields"""
extracted = []
header = table[0]
for row in table[1:]:
if keep_row(row):
r = dict()
for i, value in enumerate(row):
if header[i] == "":
r["Fournisseur"] = value
else:
r[header[i]] = value
for k, v in additionnal_fields.items():
r[k] = v
if "honoraire" in row[RECAPITULATIF_DES_OPERATIONS].lower():
r["Fournisseur"] = DEFAULT_FOURNISSEUR
extracted.append(r)
return extracted
def table2df(tables):
dfs = []
for table in tables:
df = (
pd.DataFrame.from_records(table)
.replace("", np.nan)
.dropna(subset=["Débits", "Crédits"], how="all")
)
df["Fournisseur"] = df["Fournisseur"].fillna(method="ffill")
dfs.append(df)
df = pd.concat(dfs)
df["immeuble"] = df["immeuble"].apply(lambda x: x[0].capitalize())
df["lot"] = df["RECAPITULATIF DES OPERATIONS"].apply(get_lot)
return df.astype(DF_TYPES)

View File

@ -1,48 +1,22 @@
from pydantic import BaseModel, field_validator
import numpy as np
import pandas as pd
HEADER_LOC = [
"Locataires",
"Période",
"Loyers",
"Taxes",
"Provisions",
"Divers",
"",
"Total",
"Réglés",
"Impayés",
]
class Line(BaseModel):
mois: int
annee: int
immeuble: str
Lot: str
Type: str
Locataire: str
Loyers: float
Taxes: float
Provisions: float
Divers: float
Total: float
Réglés: float
Impayés: float
@field_validator(
"Loyers",
"Taxes",
"Provisions",
"Divers",
"Total",
"Réglés",
"Impayés",
mode="before",
)
def set_default_if_empty(cls, v):
if v == "":
return 0
return v
DF_TYPES = {
"Locataires": str,
"Période": str,
"Loyers": float,
"Taxes": float,
"Provisions": float,
"Divers": str,
"Total": float,
"Réglés": float,
"Impayés": float,
"immeuble": str,
"mois": str,
"annee": str,
"Lot": str,
"Type": str,
}
def is_it(page_text):
@ -51,56 +25,142 @@ def is_it(page_text):
return False
def is_drop(row):
if "totaux" in row[0].lower():
return True
if not any(row):
return True
return False
def extract(table, additionnal_fields: dict = {}):
"""Turn table to dictionary with additional fields"""
extracted = []
header = table[0]
for row in table[1:]:
if not is_drop(row):
r = dict()
for i, value in enumerate(row):
if header[i] != "":
r[header[i]] = value
for k, v in additionnal_fields.items():
r[k] = v
extracted.append(r)
return extracted
def join_row(last, next):
row = {}
for key in last:
if last[key] == next[key]:
row[key] = last[key]
elif last[key] and next[key]:
row[key] = f"{last[key]}\n{next[key]}"
elif last[key]:
row[key] = last[key]
elif next[key]:
row[key] = next[key]
else:
row[key] = ""
return row
def join_tables(tables):
joined = tables[0]
for t in tables[1:]:
last_row = joined[-1]
if "totaux" not in last_row["Locataires"].lower():
first_row = t[0]
joined_row = join_row(last_row, first_row)
joined = joined[:-1] + [joined_row] + t[1:]
else:
joined += t
return joined
def parse_lot(string):
words = string.split(" ")
return {"Lot": "{:02d}".format(int(words[1])), "Type": " ".join(words[2:])}
def fsm():
current_state = "new_row"
row = {}
line = yield
while True:
if line == HEADER_LOC:
line = yield
elif current_state == "new_row":
if line[0] != "" and line[0] != "TOTAUX":
row.update(parse_lot(line[0]))
current_state = "add_loc"
line = yield
elif current_state == "add_loc":
if line[0] != "":
row["Locataire"] = line[0]
current_state = "add_totaux"
line = yield
elif current_state == "add_totaux":
if line[0] == "Totaux":
if line[6] is None:
row.update(
{
"Loyers": line[2],
"Taxes": line[3],
"Provisions": line[4],
"Divers": line[5],
"Total": line[7],
"Réglés": line[8],
"Impayés": line[9],
}
)
else:
row.update(
{
"Loyers": line[2],
"Taxes": line[3],
"Provisions": line[4],
"Divers": line[5],
"Total": line[6],
"Réglés": line[7],
"Impayés": line[8],
}
)
line = yield row
row = {}
current_state = "new_row"
else:
line = yield
def clean_type(string):
if "appartement" in string.lower():
return string[-2:]
return string
def join_row(table):
joined = []
for row in table:
if row["Locataires"].startswith("Lot"):
row.update(parse_lot(row["Locataires"]))
row["Locataires"] = ""
joined.append(row)
elif row["Locataires"] == "Rappel de Loyer":
last_row = joined[-1]
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
"Divers": "Rappel de Loyer",
}
)
joined.append(row)
elif row["Locataires"]:
last_row = joined.pop()
row_name = row["Locataires"].replace("\n", " ")
row.update({k: v for k, v in last_row.items() if v})
row["Locataires"] = last_row["Locataires"] + " " + row_name
joined.append(row)
else:
if row["Période"].startswith("Solde"):
last_row = joined.pop()
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
}
)
joined.append(row)
elif row["Période"].startswith("Du"):
last_row = joined[-1]
row.update(
{
"Lot": last_row["Lot"],
"Type": last_row["Type"],
"Locataires": last_row["Locataires"],
}
)
joined.append(row)
return joined
def flat_tables(tables):
tables_flat = []
for table in tables:
tables_flat.extend(table)
return tables_flat
def table2df(tables):
tables = flat_tables(tables)
joined = join_row(tables)
df = pd.DataFrame.from_records(joined)
df["immeuble"] = df["immeuble"].apply(lambda x: x[0].capitalize())
df["Type"] = df["Type"].apply(clean_type)
numeric_cols = [k for k, v in DF_TYPES.items() if v == float]
df[numeric_cols] = df[numeric_cols].replace("", np.nan)
df = df.drop(df[(df["Locataires"] == "") & (df["Période"] == "")].index)
return df.astype(DF_TYPES)

View File

@ -1,74 +1,4 @@
from pydantic import BaseModel, field_validator
HEADER_PATRIMOINE = [
"Etage",
"Lots",
"Type de lot",
"Nom du Locataire",
"Loyer Annuel",
"Début Bail",
"Fin Bail",
"Entrée",
"Départ",
"Révisé le",
"U",
"Dépôt Gar.",
]
class Line(BaseModel):
mois: int
annee: int
immeuble: str
Etage: str
Lot: str
Type: str
Locataire: str
Loyer_annuel: int
Debut_bail: str
Fin_bail: str
Entree: str
Depart: str
Revision_bail: str
Usage: str
Depot_garantie: float
@field_validator("Loyer_annuel", "Depot_garantie", mode="before")
def set_default_if_empty(cls, v):
if v == "":
return 0
return v
def is_it(page_text):
if "VOTRE PATRIMOINE" in page_text:
return True
return False
def fsm():
current_state = "new_line"
row = {}
line = yield
while True:
if line == HEADER_PATRIMOINE:
line = yield
if current_state == "new_line":
if line[0] != "":
row = {
"Etage": line[0],
"Lot": line[1][-2:] if line[1] != "" else row["Lot"],
"Type": line[2] if line[2] != "" else row["Type"],
"Locataire": line[3],
"Loyer_annuel": line[4].replace(" ", ""),
"Debut_bail": line[5],
"Fin_bail": line[6],
"Entree": line[7],
"Depart": line[8],
"Revision_bail": line[9],
"Usage": line[10],
"Depot_garantie": line[11].replace(" ", ""),
}
line = yield row
else:
line = yield

View File

@ -4,7 +4,7 @@ from pathlib import Path
import click
from .extract import extract_save, extract_plan
from .extract import extract_save
from .join import join_excel
@ -42,83 +42,27 @@ def extract():
@extract.command()
@click.argument("pdf_file", required=1)
@click.option("--dest", help="Où mettre les fichiers produits", default="")
@click.option(
"--only-plan",
help="Ne produit rien mais indique les changements",
default=False,
is_flag=True,
)
@click.option(
"--force",
help="Écrase les fichiers produits précédemment",
default=False,
is_flag=True,
)
def on(pdf_file, dest, force, only_plan):
pdf_file = Path(pdf_file)
def on(pdf_file, dest):
if not dest:
pdf_path = Path(pdf_file)
dest = pdf_path.parent
else:
dest = Path(dest)
assert pdf_file.exists()
logging.info(f"Found {pdf_file}")
plan_dest = extract_plan(pdf_file, dest)
save = []
for k, p in plan_dest.items():
if not p.exists() or force:
save.append(k)
if only_plan:
for s in save:
logging.info(f"Planing to create {plan_dest[s]}")
else:
dest.mkdir(parents=True, exist_ok=True)
extract_save(pdf_file, dest, save)
extract_save(pdf_file, dest)
@extract.command()
@click.option(
"--src", help="Tous les fichiers dans folder (de façon récursive)", default="./"
)
@click.option("--src", help="Tous les fichiers dans folder", default="./")
@click.option("--dest", help="Où mettre les fichiers produits", default="./")
@click.option(
"--only-plan",
help="Ne produit rien mais indique les changements",
default=False,
is_flag=True,
)
@click.option(
"--force",
help="Écrase les fichiers produits précédemment",
default=False,
is_flag=True,
)
def all(src, dest, force, only_plan):
src_path = Path(src)
def all(src, dest):
p = Path(src)
dest = Path(dest)
dest.mkdir(exist_ok=True)
d = Path(dest)
d.mkdir(exist_ok=True)
for pdf_file in src_path.rglob("**/*.pdf"):
relative_path = pdf_file.relative_to(src_path)
files_dest = dest / relative_path.parent
pdf_files = [x for x in p.iterdir() if ".pdf" in str(x)]
for pdf_file in pdf_files:
logging.info(f"Found {pdf_file}")
plan_dest = extract_plan(pdf_file, files_dest)
save = []
for k, p in plan_dest.items():
if not p.exists() or force:
save.append(k)
if only_plan:
for s in save:
logging.info(f"Planing to create {plan_dest[s]}")
else:
files_dest.mkdir(parents=True, exist_ok=True)
extract_save(pdf_file, files_dest, save)
extract_save(pdf_file, d)
@main.command()

View File

@ -1,5 +1,3 @@
pdfplumber
numpy
pandas
click
openpyxl