Compare commits
No commits in common. "rework_extract" and "main" have entirely different histories.
rework_ext
...
main
1345
Extract pdf.ipynb
1345
Extract pdf.ipynb
File diff suppressed because one or more lines are too long
20
README.md
20
README.md
@ -1,23 +1,3 @@
|
|||||||
# PDF AURALIA
|
# PDF AURALIA
|
||||||
|
|
||||||
Extraction de fichiers de comptabilité en pdf vers xlsx.
|
Extraction de fichiers de comptabilité en pdf vers xlsx.
|
||||||
|
|
||||||
## Utilisation
|
|
||||||
|
|
||||||
- Lancement sur un fichier pdf particulier
|
|
||||||
|
|
||||||
```bash
|
|
||||||
pdf_oralia extract on <pdf_file> --dest <where to put producted files>
|
|
||||||
```
|
|
||||||
|
|
||||||
- Lancement sur tous les fichiers d'un repertoire (récursivement )
|
|
||||||
|
|
||||||
```bash
|
|
||||||
pdf_oralia extract all --src <source folder> --dest <destination folder>
|
|
||||||
```
|
|
||||||
|
|
||||||
Cette commande reproduira la structure du dossier source dans destination. Seul les fichiers non existants seront traités. Par default, les fichiers déjà produits ne seront pas écrasés.
|
|
||||||
On peut ajouter les options suivantes:
|
|
||||||
|
|
||||||
- `--force`: pour écraser les fichiers déjà traités
|
|
||||||
- `--only-plan`: pour voir quels fichiers pourraient être créé sans le faire.
|
|
||||||
|
@ -1,11 +1,10 @@
|
|||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
import pdfplumber
|
import pdfplumber
|
||||||
|
|
||||||
from pdf_oralia.pages import charge, locataire, patrimoine
|
from pdf_oralia.pages import charge, locataire, patrimoine, recapitulatif
|
||||||
|
|
||||||
extract_table_settings = {
|
extract_table_settings = {
|
||||||
"vertical_strategy": "lines",
|
"vertical_strategy": "lines",
|
||||||
@ -13,10 +12,6 @@ extract_table_settings = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class ExtractError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def extract_date(page_text):
|
def extract_date(page_text):
|
||||||
"""Extract date from a page
|
"""Extract date from a page
|
||||||
|
|
||||||
@ -37,100 +32,68 @@ def extract_building(page_text, buildings=["bloch", "marietton", "servient"]):
|
|||||||
raise ValueError("Pas d'immeuble trouvé")
|
raise ValueError("Pas d'immeuble trouvé")
|
||||||
|
|
||||||
|
|
||||||
def pdf_extract_tables_lines(pdf):
|
def catch_malformed_table(tables):
|
||||||
loc_sink = locataire.fsm()
|
if len(tables) == 2:
|
||||||
next(loc_sink)
|
return tables[0] + tables[1]
|
||||||
charge_sink = charge.fsm()
|
return tables[0]
|
||||||
next(charge_sink)
|
|
||||||
patrimoine_sink = patrimoine.fsm()
|
|
||||||
next(patrimoine_sink)
|
|
||||||
|
|
||||||
for page_number, page in enumerate(pdf.pages):
|
|
||||||
page_text = page.extract_text()
|
|
||||||
date = extract_date(page_text)
|
|
||||||
try:
|
|
||||||
additionnal_fields = {
|
|
||||||
"immeuble": extract_building(page_text),
|
|
||||||
"mois": date.strftime("%m"),
|
|
||||||
"annee": date.strftime("%Y"),
|
|
||||||
}
|
|
||||||
except ValueError:
|
|
||||||
logging.warning(
|
|
||||||
f"L'immeuble de la page {page_number+1} non identifiable. Page ignorée."
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
table_type = ""
|
|
||||||
if locataire.is_it(page_text):
|
|
||||||
table_type = "locataire"
|
|
||||||
elif charge.is_it(page_text):
|
|
||||||
table_type = "charge"
|
|
||||||
elif patrimoine.is_it(page_text):
|
|
||||||
table_type = "patrimoine"
|
|
||||||
else:
|
|
||||||
logging.warning(
|
|
||||||
f"Type de la page {page_number+1} non identifiable. Page ignorée."
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
for line in page.extract_table(extract_table_settings):
|
|
||||||
if table_type == "locataire":
|
|
||||||
res = loc_sink.send(line)
|
|
||||||
if res:
|
|
||||||
res.update(additionnal_fields)
|
|
||||||
yield locataire.Line(**res)
|
|
||||||
elif table_type == "charge":
|
|
||||||
res = charge_sink.send(line)
|
|
||||||
if res:
|
|
||||||
res.update(additionnal_fields)
|
|
||||||
yield charge.Line(**res)
|
|
||||||
|
|
||||||
elif table_type == "patrimoine":
|
|
||||||
res = patrimoine_sink.send(line)
|
|
||||||
if res:
|
|
||||||
res.update(additionnal_fields)
|
|
||||||
yield patrimoine.Line(**res)
|
|
||||||
|
|
||||||
|
|
||||||
def from_pdf(pdf_file):
|
def from_pdf(pdf_file):
|
||||||
"""Build dataframes one about charges and another on loc"""
|
"""Build dataframes one about charges and another on loc"""
|
||||||
pdf = pdfplumber.open(pdf_file)
|
pdf = pdfplumber.open(pdf_file)
|
||||||
locataire_lines = []
|
recapitulatif_tables = []
|
||||||
charge_lines = []
|
loc_tables = []
|
||||||
patrimoine_lines = []
|
charge_tables = []
|
||||||
for line in pdf_extract_tables_lines(pdf):
|
patrimoie_tables = []
|
||||||
if isinstance(line, locataire.Line):
|
|
||||||
locataire_lines.append(line)
|
for page_number, page in enumerate(pdf.pages):
|
||||||
elif isinstance(line, charge.Line):
|
page_text = page.extract_text()
|
||||||
charge_lines.append(line)
|
date = extract_date(page_text)
|
||||||
elif isinstance(line, patrimoine.Line):
|
additionnal_fields = {
|
||||||
patrimoine_lines.append(line)
|
"immeuble": extract_building(page_text),
|
||||||
|
"mois": date.strftime("%m"),
|
||||||
|
"annee": date.strftime("%Y"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if recapitulatif.is_it(page_text):
|
||||||
|
table = page.extract_tables()[0]
|
||||||
|
extracted = recapitulatif.extract(table, additionnal_fields)
|
||||||
|
if extracted:
|
||||||
|
recapitulatif_tables.append(extracted)
|
||||||
|
|
||||||
|
elif locataire.is_it(page_text):
|
||||||
|
tables = page.extract_tables(extract_table_settings)[1:]
|
||||||
|
table = catch_malformed_table(tables)
|
||||||
|
extracted = locataire.extract(table, additionnal_fields)
|
||||||
|
loc_tables.append(extracted)
|
||||||
|
|
||||||
|
elif charge.is_it(page_text):
|
||||||
|
tables = page.extract_tables(extract_table_settings)[1:]
|
||||||
|
table = catch_malformed_table(tables)
|
||||||
|
extracted = charge.extract(table, additionnal_fields)
|
||||||
|
charge_tables.append(extracted)
|
||||||
|
|
||||||
|
elif patrimoine.is_it(page_text):
|
||||||
|
pass
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logging.warning(f"Page {page_number+1} non reconnu. Page ignorée.")
|
logging.warning(f"Page {page_number+1} non reconnu. Page ignorée.")
|
||||||
|
|
||||||
return {
|
df_charge = charge.table2df(recapitulatif_tables + charge_tables)
|
||||||
"charge": pd.DataFrame([c.__dict__ for c in charge_lines]),
|
df_loc = locataire.table2df(loc_tables)
|
||||||
"locataire": pd.DataFrame([c.__dict__ for c in locataire_lines]),
|
|
||||||
"patrimoine": pd.DataFrame([c.__dict__ for c in patrimoine_lines]),
|
return df_charge, df_loc
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def extract_plan(pdf_file, dest):
|
def extract_save(pdf_file, dest):
|
||||||
return {
|
|
||||||
"charge": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_charge.xlsx",
|
|
||||||
"locataire": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx",
|
|
||||||
"patrimoine": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_patrimoine.xlsx",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def extract_save(pdf_file, dest, save=[]):
|
|
||||||
"""Extract charge and locataire for pdf_file and put xlsx file in dest"""
|
"""Extract charge and locataire for pdf_file and put xlsx file in dest"""
|
||||||
pdf_file = Path(pdf_file)
|
pdf_file = Path(pdf_file)
|
||||||
xlss = extract_plan(pdf_file, dest)
|
xls_charge = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_charge.xlsx"
|
||||||
|
xls_locataire = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx"
|
||||||
|
|
||||||
dfs = from_pdf(pdf_file)
|
df_charge, df_loc = from_pdf(pdf_file)
|
||||||
|
|
||||||
for s in save:
|
df_charge.to_excel(xls_charge, sheet_name="Charges", index=False)
|
||||||
dfs[s].to_excel(xlss[s], sheet_name=s, index=False)
|
logging.info(f"{xls_charge} saved")
|
||||||
logging.info(f"{xlss[s]} saved")
|
df_loc.to_excel(xls_locataire, sheet_name="Location", index=False)
|
||||||
|
logging.info(f"{xls_locataire} saved")
|
||||||
return {k: v for k, v in xlss.items() if k in save}
|
|
||||||
|
@ -1,16 +1,9 @@
|
|||||||
import re
|
import re
|
||||||
from pydantic import BaseModel, field_validator
|
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
HEADER_CHARGE = [
|
RECAPITULATIF_DES_OPERATIONS = 1
|
||||||
"",
|
|
||||||
"RECAPITULATIF DES OPERATIONS",
|
|
||||||
"Débits",
|
|
||||||
"Crédits",
|
|
||||||
"Dont T.V.A.",
|
|
||||||
"Locatif",
|
|
||||||
"Déductible",
|
|
||||||
]
|
|
||||||
DF_TYPES = {
|
DF_TYPES = {
|
||||||
"Fournisseur": str,
|
"Fournisseur": str,
|
||||||
"RECAPITULATIF DES OPERATIONS": str,
|
"RECAPITULATIF DES OPERATIONS": str,
|
||||||
@ -24,30 +17,7 @@ DF_TYPES = {
|
|||||||
"annee": str,
|
"annee": str,
|
||||||
"lot": str,
|
"lot": str,
|
||||||
}
|
}
|
||||||
|
DEFAULT_FOURNISSEUR = "ROSIER MODICA MOTTEROZ SA"
|
||||||
|
|
||||||
class Line(BaseModel):
|
|
||||||
mois: int
|
|
||||||
annee: int
|
|
||||||
immeuble: str
|
|
||||||
lot: str
|
|
||||||
Champs: str
|
|
||||||
Categorie: str
|
|
||||||
Fournisseur: str
|
|
||||||
Libellé: str
|
|
||||||
Débit: float
|
|
||||||
Crédits: float
|
|
||||||
Dont_TVA: float
|
|
||||||
Locatif: float
|
|
||||||
Déductible: float
|
|
||||||
|
|
||||||
@field_validator(
|
|
||||||
"Débit", "Crédits", "Dont_TVA", "Locatif", "Déductible", mode="before"
|
|
||||||
)
|
|
||||||
def set_default_if_empty(cls, v):
|
|
||||||
if v == "":
|
|
||||||
return 0
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
def is_it(page_text):
|
def is_it(page_text):
|
||||||
@ -71,54 +41,51 @@ def get_lot(txt):
|
|||||||
return "*"
|
return "*"
|
||||||
|
|
||||||
|
|
||||||
def fsm():
|
def keep_row(row):
|
||||||
current_state = "total"
|
return not any(
|
||||||
row = {}
|
[
|
||||||
line = yield
|
word.lower() in row[RECAPITULATIF_DES_OPERATIONS].lower()
|
||||||
while True:
|
for word in ["TOTAL", "TOTAUX", "Solde créditeur", "Solde débiteur"]
|
||||||
if line == HEADER_CHARGE:
|
]
|
||||||
line = yield
|
)
|
||||||
if current_state == "total":
|
|
||||||
if line[1].lower().split(" ")[0] in ["total", "totaux"]:
|
|
||||||
current_state = "new_champs"
|
def extract(table, additionnal_fields: dict = {}):
|
||||||
line = yield
|
"""Turn table to dictionary with additional fields"""
|
||||||
elif current_state == "new_champs":
|
extracted = []
|
||||||
if line[0] != "":
|
header = table[0]
|
||||||
current_state = "new_cat_line"
|
for row in table[1:]:
|
||||||
row = {"Champs": line[0], "Categorie": "", "Fournisseur": ""}
|
if keep_row(row):
|
||||||
line = yield
|
r = dict()
|
||||||
elif current_state == "new_cat_line":
|
for i, value in enumerate(row):
|
||||||
if line[1].lower().split(" ")[0] in ["total", "totaux"]:
|
if header[i] == "":
|
||||||
current_state = "new_champs"
|
r["Fournisseur"] = value
|
||||||
line = yield
|
else:
|
||||||
row = {}
|
r[header[i]] = value
|
||||||
elif line[2] != "" or line[3] != "":
|
|
||||||
row.update(
|
for k, v in additionnal_fields.items():
|
||||||
{
|
r[k] = v
|
||||||
"Fournisseur": line[0] if line[0] != "" else row["Fournisseur"],
|
|
||||||
"Libellé": line[1],
|
if "honoraire" in row[RECAPITULATIF_DES_OPERATIONS].lower():
|
||||||
"lot": get_lot(line[1]),
|
r["Fournisseur"] = DEFAULT_FOURNISSEUR
|
||||||
"Débit": line[2],
|
|
||||||
"Crédits": line[3],
|
extracted.append(r)
|
||||||
"Dont_TVA": line[4],
|
|
||||||
"Locatif": line[5],
|
return extracted
|
||||||
"Déductible": line[6],
|
|
||||||
}
|
|
||||||
)
|
def table2df(tables):
|
||||||
line = yield row
|
dfs = []
|
||||||
row = {
|
for table in tables:
|
||||||
"Champs": row["Champs"],
|
df = (
|
||||||
"Categorie": row["Categorie"],
|
pd.DataFrame.from_records(table)
|
||||||
"Fournisseur": row["Fournisseur"],
|
.replace("", np.nan)
|
||||||
}
|
.dropna(subset=["Débits", "Crédits"], how="all")
|
||||||
elif line[0] != "" and line[1] == "":
|
)
|
||||||
row.update({"Categorie": line[0]})
|
df["Fournisseur"] = df["Fournisseur"].fillna(method="ffill")
|
||||||
line = yield
|
dfs.append(df)
|
||||||
elif line[1] != "":
|
df = pd.concat(dfs)
|
||||||
row.update({"Categorie": line[1]})
|
|
||||||
line = yield
|
df["immeuble"] = df["immeuble"].apply(lambda x: x[0].capitalize())
|
||||||
elif line[0] != "":
|
df["lot"] = df["RECAPITULATIF DES OPERATIONS"].apply(get_lot)
|
||||||
row.update({"Fournisseur": line[0]})
|
return df.astype(DF_TYPES)
|
||||||
line = yield
|
|
||||||
else:
|
|
||||||
line = yield
|
|
||||||
|
@ -1,48 +1,22 @@
|
|||||||
from pydantic import BaseModel, field_validator
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
HEADER_LOC = [
|
DF_TYPES = {
|
||||||
"Locataires",
|
"Locataires": str,
|
||||||
"Période",
|
"Période": str,
|
||||||
"Loyers",
|
"Loyers": float,
|
||||||
"Taxes",
|
"Taxes": float,
|
||||||
"Provisions",
|
"Provisions": float,
|
||||||
"Divers",
|
"Divers": str,
|
||||||
"",
|
"Total": float,
|
||||||
"Total",
|
"Réglés": float,
|
||||||
"Réglés",
|
"Impayés": float,
|
||||||
"Impayés",
|
"immeuble": str,
|
||||||
]
|
"mois": str,
|
||||||
|
"annee": str,
|
||||||
|
"Lot": str,
|
||||||
class Line(BaseModel):
|
"Type": str,
|
||||||
mois: int
|
}
|
||||||
annee: int
|
|
||||||
immeuble: str
|
|
||||||
Lot: str
|
|
||||||
Type: str
|
|
||||||
Locataire: str
|
|
||||||
Loyers: float
|
|
||||||
Taxes: float
|
|
||||||
Provisions: float
|
|
||||||
Divers: float
|
|
||||||
Total: float
|
|
||||||
Réglés: float
|
|
||||||
Impayés: float
|
|
||||||
|
|
||||||
@field_validator(
|
|
||||||
"Loyers",
|
|
||||||
"Taxes",
|
|
||||||
"Provisions",
|
|
||||||
"Divers",
|
|
||||||
"Total",
|
|
||||||
"Réglés",
|
|
||||||
"Impayés",
|
|
||||||
mode="before",
|
|
||||||
)
|
|
||||||
def set_default_if_empty(cls, v):
|
|
||||||
if v == "":
|
|
||||||
return 0
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
def is_it(page_text):
|
def is_it(page_text):
|
||||||
@ -51,56 +25,142 @@ def is_it(page_text):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def is_drop(row):
|
||||||
|
if "totaux" in row[0].lower():
|
||||||
|
return True
|
||||||
|
if not any(row):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def extract(table, additionnal_fields: dict = {}):
|
||||||
|
"""Turn table to dictionary with additional fields"""
|
||||||
|
extracted = []
|
||||||
|
header = table[0]
|
||||||
|
for row in table[1:]:
|
||||||
|
if not is_drop(row):
|
||||||
|
r = dict()
|
||||||
|
for i, value in enumerate(row):
|
||||||
|
if header[i] != "":
|
||||||
|
r[header[i]] = value
|
||||||
|
for k, v in additionnal_fields.items():
|
||||||
|
r[k] = v
|
||||||
|
extracted.append(r)
|
||||||
|
return extracted
|
||||||
|
|
||||||
|
|
||||||
|
def join_row(last, next):
|
||||||
|
row = {}
|
||||||
|
for key in last:
|
||||||
|
if last[key] == next[key]:
|
||||||
|
row[key] = last[key]
|
||||||
|
elif last[key] and next[key]:
|
||||||
|
row[key] = f"{last[key]}\n{next[key]}"
|
||||||
|
elif last[key]:
|
||||||
|
row[key] = last[key]
|
||||||
|
elif next[key]:
|
||||||
|
row[key] = next[key]
|
||||||
|
else:
|
||||||
|
row[key] = ""
|
||||||
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
def join_tables(tables):
|
||||||
|
joined = tables[0]
|
||||||
|
|
||||||
|
for t in tables[1:]:
|
||||||
|
last_row = joined[-1]
|
||||||
|
if "totaux" not in last_row["Locataires"].lower():
|
||||||
|
first_row = t[0]
|
||||||
|
joined_row = join_row(last_row, first_row)
|
||||||
|
joined = joined[:-1] + [joined_row] + t[1:]
|
||||||
|
else:
|
||||||
|
joined += t
|
||||||
|
|
||||||
|
return joined
|
||||||
|
|
||||||
|
|
||||||
def parse_lot(string):
|
def parse_lot(string):
|
||||||
words = string.split(" ")
|
words = string.split(" ")
|
||||||
return {"Lot": "{:02d}".format(int(words[1])), "Type": " ".join(words[2:])}
|
return {"Lot": "{:02d}".format(int(words[1])), "Type": " ".join(words[2:])}
|
||||||
|
|
||||||
|
|
||||||
def fsm():
|
def clean_type(string):
|
||||||
current_state = "new_row"
|
if "appartement" in string.lower():
|
||||||
row = {}
|
return string[-2:]
|
||||||
line = yield
|
return string
|
||||||
while True:
|
|
||||||
if line == HEADER_LOC:
|
|
||||||
line = yield
|
def join_row(table):
|
||||||
elif current_state == "new_row":
|
joined = []
|
||||||
if line[0] != "" and line[0] != "TOTAUX":
|
for row in table:
|
||||||
row.update(parse_lot(line[0]))
|
if row["Locataires"].startswith("Lot"):
|
||||||
current_state = "add_loc"
|
row.update(parse_lot(row["Locataires"]))
|
||||||
line = yield
|
row["Locataires"] = ""
|
||||||
elif current_state == "add_loc":
|
joined.append(row)
|
||||||
if line[0] != "":
|
elif row["Locataires"] == "Rappel de Loyer":
|
||||||
row["Locataire"] = line[0]
|
last_row = joined[-1]
|
||||||
current_state = "add_totaux"
|
row.update(
|
||||||
line = yield
|
{
|
||||||
elif current_state == "add_totaux":
|
"Lot": last_row["Lot"],
|
||||||
if line[0] == "Totaux":
|
"Type": last_row["Type"],
|
||||||
if line[6] is None:
|
"Locataires": last_row["Locataires"],
|
||||||
row.update(
|
"Divers": "Rappel de Loyer",
|
||||||
{
|
}
|
||||||
"Loyers": line[2],
|
)
|
||||||
"Taxes": line[3],
|
joined.append(row)
|
||||||
"Provisions": line[4],
|
|
||||||
"Divers": line[5],
|
elif row["Locataires"]:
|
||||||
"Total": line[7],
|
last_row = joined.pop()
|
||||||
"Réglés": line[8],
|
row_name = row["Locataires"].replace("\n", " ")
|
||||||
"Impayés": line[9],
|
row.update({k: v for k, v in last_row.items() if v})
|
||||||
}
|
row["Locataires"] = last_row["Locataires"] + " " + row_name
|
||||||
)
|
joined.append(row)
|
||||||
else:
|
|
||||||
row.update(
|
else:
|
||||||
{
|
if row["Période"].startswith("Solde"):
|
||||||
"Loyers": line[2],
|
last_row = joined.pop()
|
||||||
"Taxes": line[3],
|
row.update(
|
||||||
"Provisions": line[4],
|
{
|
||||||
"Divers": line[5],
|
"Lot": last_row["Lot"],
|
||||||
"Total": line[6],
|
"Type": last_row["Type"],
|
||||||
"Réglés": line[7],
|
"Locataires": last_row["Locataires"],
|
||||||
"Impayés": line[8],
|
}
|
||||||
}
|
)
|
||||||
)
|
joined.append(row)
|
||||||
line = yield row
|
|
||||||
row = {}
|
elif row["Période"].startswith("Du"):
|
||||||
current_state = "new_row"
|
last_row = joined[-1]
|
||||||
else:
|
row.update(
|
||||||
line = yield
|
{
|
||||||
|
"Lot": last_row["Lot"],
|
||||||
|
"Type": last_row["Type"],
|
||||||
|
"Locataires": last_row["Locataires"],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
joined.append(row)
|
||||||
|
|
||||||
|
return joined
|
||||||
|
|
||||||
|
|
||||||
|
def flat_tables(tables):
|
||||||
|
tables_flat = []
|
||||||
|
for table in tables:
|
||||||
|
tables_flat.extend(table)
|
||||||
|
return tables_flat
|
||||||
|
|
||||||
|
|
||||||
|
def table2df(tables):
|
||||||
|
tables = flat_tables(tables)
|
||||||
|
joined = join_row(tables)
|
||||||
|
df = pd.DataFrame.from_records(joined)
|
||||||
|
|
||||||
|
df["immeuble"] = df["immeuble"].apply(lambda x: x[0].capitalize())
|
||||||
|
df["Type"] = df["Type"].apply(clean_type)
|
||||||
|
|
||||||
|
numeric_cols = [k for k, v in DF_TYPES.items() if v == float]
|
||||||
|
df[numeric_cols] = df[numeric_cols].replace("", np.nan)
|
||||||
|
|
||||||
|
df = df.drop(df[(df["Locataires"] == "") & (df["Période"] == "")].index)
|
||||||
|
|
||||||
|
return df.astype(DF_TYPES)
|
||||||
|
@ -1,74 +1,4 @@
|
|||||||
from pydantic import BaseModel, field_validator
|
|
||||||
|
|
||||||
HEADER_PATRIMOINE = [
|
|
||||||
"Etage",
|
|
||||||
"Lots",
|
|
||||||
"Type de lot",
|
|
||||||
"Nom du Locataire",
|
|
||||||
"Loyer Annuel",
|
|
||||||
"Début Bail",
|
|
||||||
"Fin Bail",
|
|
||||||
"Entrée",
|
|
||||||
"Départ",
|
|
||||||
"Révisé le",
|
|
||||||
"U",
|
|
||||||
"Dépôt Gar.",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class Line(BaseModel):
|
|
||||||
mois: int
|
|
||||||
annee: int
|
|
||||||
immeuble: str
|
|
||||||
Etage: str
|
|
||||||
Lot: str
|
|
||||||
Type: str
|
|
||||||
Locataire: str
|
|
||||||
Loyer_annuel: int
|
|
||||||
Debut_bail: str
|
|
||||||
Fin_bail: str
|
|
||||||
Entree: str
|
|
||||||
Depart: str
|
|
||||||
Revision_bail: str
|
|
||||||
Usage: str
|
|
||||||
Depot_garantie: float
|
|
||||||
|
|
||||||
@field_validator("Loyer_annuel", "Depot_garantie", mode="before")
|
|
||||||
def set_default_if_empty(cls, v):
|
|
||||||
if v == "":
|
|
||||||
return 0
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
def is_it(page_text):
|
def is_it(page_text):
|
||||||
if "VOTRE PATRIMOINE" in page_text:
|
if "VOTRE PATRIMOINE" in page_text:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def fsm():
|
|
||||||
current_state = "new_line"
|
|
||||||
row = {}
|
|
||||||
line = yield
|
|
||||||
while True:
|
|
||||||
if line == HEADER_PATRIMOINE:
|
|
||||||
line = yield
|
|
||||||
if current_state == "new_line":
|
|
||||||
if line[0] != "":
|
|
||||||
row = {
|
|
||||||
"Etage": line[0],
|
|
||||||
"Lot": line[1][-2:] if line[1] != "" else row["Lot"],
|
|
||||||
"Type": line[2] if line[2] != "" else row["Type"],
|
|
||||||
"Locataire": line[3],
|
|
||||||
"Loyer_annuel": line[4].replace(" ", ""),
|
|
||||||
"Debut_bail": line[5],
|
|
||||||
"Fin_bail": line[6],
|
|
||||||
"Entree": line[7],
|
|
||||||
"Depart": line[8],
|
|
||||||
"Revision_bail": line[9],
|
|
||||||
"Usage": line[10],
|
|
||||||
"Depot_garantie": line[11].replace(" ", ""),
|
|
||||||
}
|
|
||||||
line = yield row
|
|
||||||
else:
|
|
||||||
line = yield
|
|
||||||
|
@ -4,7 +4,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import click
|
import click
|
||||||
|
|
||||||
from .extract import extract_save, extract_plan
|
from .extract import extract_save
|
||||||
from .join import join_excel
|
from .join import join_excel
|
||||||
|
|
||||||
|
|
||||||
@ -42,83 +42,27 @@ def extract():
|
|||||||
@extract.command()
|
@extract.command()
|
||||||
@click.argument("pdf_file", required=1)
|
@click.argument("pdf_file", required=1)
|
||||||
@click.option("--dest", help="Où mettre les fichiers produits", default="")
|
@click.option("--dest", help="Où mettre les fichiers produits", default="")
|
||||||
@click.option(
|
def on(pdf_file, dest):
|
||||||
"--only-plan",
|
|
||||||
help="Ne produit rien mais indique les changements",
|
|
||||||
default=False,
|
|
||||||
is_flag=True,
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"--force",
|
|
||||||
help="Écrase les fichiers produits précédemment",
|
|
||||||
default=False,
|
|
||||||
is_flag=True,
|
|
||||||
)
|
|
||||||
def on(pdf_file, dest, force, only_plan):
|
|
||||||
pdf_file = Path(pdf_file)
|
|
||||||
if not dest:
|
if not dest:
|
||||||
pdf_path = Path(pdf_file)
|
pdf_path = Path(pdf_file)
|
||||||
dest = pdf_path.parent
|
dest = pdf_path.parent
|
||||||
else:
|
|
||||||
dest = Path(dest)
|
|
||||||
|
|
||||||
assert pdf_file.exists()
|
extract_save(pdf_file, dest)
|
||||||
logging.info(f"Found {pdf_file}")
|
|
||||||
|
|
||||||
plan_dest = extract_plan(pdf_file, dest)
|
|
||||||
save = []
|
|
||||||
for k, p in plan_dest.items():
|
|
||||||
if not p.exists() or force:
|
|
||||||
save.append(k)
|
|
||||||
|
|
||||||
if only_plan:
|
|
||||||
for s in save:
|
|
||||||
logging.info(f"Planing to create {plan_dest[s]}")
|
|
||||||
else:
|
|
||||||
dest.mkdir(parents=True, exist_ok=True)
|
|
||||||
extract_save(pdf_file, dest, save)
|
|
||||||
|
|
||||||
|
|
||||||
@extract.command()
|
@extract.command()
|
||||||
@click.option(
|
@click.option("--src", help="Tous les fichiers dans folder", default="./")
|
||||||
"--src", help="Tous les fichiers dans folder (de façon récursive)", default="./"
|
|
||||||
)
|
|
||||||
@click.option("--dest", help="Où mettre les fichiers produits", default="./")
|
@click.option("--dest", help="Où mettre les fichiers produits", default="./")
|
||||||
@click.option(
|
def all(src, dest):
|
||||||
"--only-plan",
|
p = Path(src)
|
||||||
help="Ne produit rien mais indique les changements",
|
|
||||||
default=False,
|
|
||||||
is_flag=True,
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"--force",
|
|
||||||
help="Écrase les fichiers produits précédemment",
|
|
||||||
default=False,
|
|
||||||
is_flag=True,
|
|
||||||
)
|
|
||||||
def all(src, dest, force, only_plan):
|
|
||||||
src_path = Path(src)
|
|
||||||
|
|
||||||
dest = Path(dest)
|
d = Path(dest)
|
||||||
dest.mkdir(exist_ok=True)
|
d.mkdir(exist_ok=True)
|
||||||
|
|
||||||
for pdf_file in src_path.rglob("**/*.pdf"):
|
pdf_files = [x for x in p.iterdir() if ".pdf" in str(x)]
|
||||||
relative_path = pdf_file.relative_to(src_path)
|
for pdf_file in pdf_files:
|
||||||
files_dest = dest / relative_path.parent
|
|
||||||
logging.info(f"Found {pdf_file}")
|
logging.info(f"Found {pdf_file}")
|
||||||
|
extract_save(pdf_file, d)
|
||||||
plan_dest = extract_plan(pdf_file, files_dest)
|
|
||||||
save = []
|
|
||||||
for k, p in plan_dest.items():
|
|
||||||
if not p.exists() or force:
|
|
||||||
save.append(k)
|
|
||||||
|
|
||||||
if only_plan:
|
|
||||||
for s in save:
|
|
||||||
logging.info(f"Planing to create {plan_dest[s]}")
|
|
||||||
else:
|
|
||||||
files_dest.mkdir(parents=True, exist_ok=True)
|
|
||||||
extract_save(pdf_file, files_dest, save)
|
|
||||||
|
|
||||||
|
|
||||||
@main.command()
|
@main.command()
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
pdfplumber
|
pdfplumber
|
||||||
numpy
|
numpy
|
||||||
pandas
|
pandas
|
||||||
click
|
|
||||||
openpyxl
|
|
||||||
|
Loading…
Reference in New Issue
Block a user