Compare commits
4 Commits
6e0ffe9085
...
092b925b68
Author | SHA1 | Date | |
---|---|---|---|
092b925b68 | |||
3c18bd5d81 | |||
4ee78a7e7b | |||
ce8cdc4c1e |
1345
Extract pdf.ipynb
1345
Extract pdf.ipynb
File diff suppressed because one or more lines are too long
20
README.md
20
README.md
@ -1,3 +1,23 @@
|
|||||||
# PDF AURALIA
|
# PDF AURALIA
|
||||||
|
|
||||||
Extraction de fichiers de comptabilité en pdf vers xlsx.
|
Extraction de fichiers de comptabilité en pdf vers xlsx.
|
||||||
|
|
||||||
|
## Utilisation
|
||||||
|
|
||||||
|
- Lancement sur un fichier pdf particulier
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pdf_oralia extract on <pdf_file> --dest <where to put producted files>
|
||||||
|
```
|
||||||
|
|
||||||
|
- Lancement sur tous les fichiers d'un repertoire (récursivement )
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pdf_oralia extract all --src <source folder> --dest <destination folder>
|
||||||
|
```
|
||||||
|
|
||||||
|
Cette commande reproduira la structure du dossier source dans destination. Seul les fichiers non existants seront traités. Par default, les fichiers déjà produits ne seront pas écrasés.
|
||||||
|
On peut ajouter les options suivantes:
|
||||||
|
|
||||||
|
- `--force`: pour écraser les fichiers déjà traités
|
||||||
|
- `--only-plan`: pour voir quels fichiers pourraient être créé sans le faire.
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
import pdfplumber
|
import pdfplumber
|
||||||
|
|
||||||
from pdf_oralia.pages import charge, locataire, patrimoine, recapitulatif
|
from pdf_oralia.pages import charge, locataire, patrimoine
|
||||||
|
|
||||||
extract_table_settings = {
|
extract_table_settings = {
|
||||||
"vertical_strategy": "lines",
|
"vertical_strategy": "lines",
|
||||||
@ -32,68 +33,102 @@ def extract_building(page_text, buildings=["bloch", "marietton", "servient"]):
|
|||||||
raise ValueError("Pas d'immeuble trouvé")
|
raise ValueError("Pas d'immeuble trouvé")
|
||||||
|
|
||||||
|
|
||||||
def catch_malformed_table(tables):
|
def pdf_extract_tables_lines(pdf):
|
||||||
if len(tables) == 2:
|
loc_sink = locataire.fsm()
|
||||||
return tables[0] + tables[1]
|
next(loc_sink)
|
||||||
return tables[0]
|
charge_sink = charge.fsm()
|
||||||
|
next(charge_sink)
|
||||||
|
patrimoine_sink = patrimoine.fsm()
|
||||||
def from_pdf(pdf_file):
|
next(patrimoine_sink)
|
||||||
"""Build dataframes one about charges and another on loc"""
|
|
||||||
pdf = pdfplumber.open(pdf_file)
|
|
||||||
recapitulatif_tables = []
|
|
||||||
loc_tables = []
|
|
||||||
charge_tables = []
|
|
||||||
patrimoie_tables = []
|
|
||||||
|
|
||||||
for page_number, page in enumerate(pdf.pages):
|
for page_number, page in enumerate(pdf.pages):
|
||||||
page_text = page.extract_text()
|
page_text = page.extract_text()
|
||||||
date = extract_date(page_text)
|
date = extract_date(page_text)
|
||||||
|
try:
|
||||||
additionnal_fields = {
|
additionnal_fields = {
|
||||||
"immeuble": extract_building(page_text),
|
"immeuble": extract_building(page_text),
|
||||||
"mois": date.strftime("%m"),
|
"mois": date.strftime("%m"),
|
||||||
"annee": date.strftime("%Y"),
|
"annee": date.strftime("%Y"),
|
||||||
}
|
}
|
||||||
|
except ValueError:
|
||||||
if recapitulatif.is_it(page_text):
|
logging.warning(
|
||||||
table = page.extract_tables()[0]
|
f"L'immeuble de la page {page_number+1} non identifiable. Page ignorée."
|
||||||
extracted = recapitulatif.extract(table, additionnal_fields)
|
)
|
||||||
if extracted:
|
continue
|
||||||
recapitulatif_tables.append(extracted)
|
table_type = ""
|
||||||
|
if locataire.is_it(page_text):
|
||||||
elif locataire.is_it(page_text):
|
table_type = "locataire"
|
||||||
tables = page.extract_tables(extract_table_settings)[1:]
|
|
||||||
table = catch_malformed_table(tables)
|
|
||||||
extracted = locataire.extract(table, additionnal_fields)
|
|
||||||
loc_tables.append(extracted)
|
|
||||||
|
|
||||||
elif charge.is_it(page_text):
|
elif charge.is_it(page_text):
|
||||||
tables = page.extract_tables(extract_table_settings)[1:]
|
table_type = "charge"
|
||||||
table = catch_malformed_table(tables)
|
|
||||||
extracted = charge.extract(table, additionnal_fields)
|
|
||||||
charge_tables.append(extracted)
|
|
||||||
|
|
||||||
elif patrimoine.is_it(page_text):
|
elif patrimoine.is_it(page_text):
|
||||||
pass
|
table_type = "patrimoine"
|
||||||
|
else:
|
||||||
|
logging.warning(
|
||||||
|
f"Type de la page {page_number+1} non identifiable. Page ignorée."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
for line in page.extract_table(extract_table_settings):
|
||||||
|
if table_type == "locataire":
|
||||||
|
res = loc_sink.send(line)
|
||||||
|
if res:
|
||||||
|
res.update(additionnal_fields)
|
||||||
|
yield locataire.Line(**res)
|
||||||
|
elif table_type == "charge":
|
||||||
|
res = charge_sink.send(line)
|
||||||
|
if res:
|
||||||
|
res.update(additionnal_fields)
|
||||||
|
yield charge.Line(**res)
|
||||||
|
|
||||||
|
elif table_type == "patrimoine":
|
||||||
|
res = patrimoine_sink.send(line)
|
||||||
|
if res:
|
||||||
|
res.update(additionnal_fields)
|
||||||
|
yield patrimoine.Line(**res)
|
||||||
|
|
||||||
|
|
||||||
|
def from_pdf(pdf_file):
|
||||||
|
"""Build dataframes one about charges and another on loc"""
|
||||||
|
pdf = pdfplumber.open(pdf_file)
|
||||||
|
locataire_lines = []
|
||||||
|
charge_lines = []
|
||||||
|
patrimoine_lines = []
|
||||||
|
for line in pdf_extract_tables_lines(pdf):
|
||||||
|
if isinstance(line, locataire.Line):
|
||||||
|
locataire_lines.append(line)
|
||||||
|
elif isinstance(line, charge.Line):
|
||||||
|
charge_lines.append(line)
|
||||||
|
elif isinstance(line, patrimoine.Line):
|
||||||
|
patrimoine_lines.append(line)
|
||||||
else:
|
else:
|
||||||
logging.warning(f"Page {page_number+1} non reconnu. Page ignorée.")
|
logging.warning(f"Page {page_number+1} non reconnu. Page ignorée.")
|
||||||
|
|
||||||
df_charge = charge.table2df(recapitulatif_tables + charge_tables)
|
return {
|
||||||
df_loc = locataire.table2df(loc_tables)
|
"charge": pd.DataFrame([c.__dict__ for c in charge_lines]),
|
||||||
|
"locataire": pd.DataFrame([c.__dict__ for c in locataire_lines]),
|
||||||
return df_charge, df_loc
|
"patrimoine": pd.DataFrame([c.__dict__ for c in patrimoine_lines]),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def extract_save(pdf_file, dest):
|
def extract_plan(pdf_file, dest):
|
||||||
|
return {
|
||||||
|
"charge": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_charge.xlsx",
|
||||||
|
"locataire": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx",
|
||||||
|
"patrimoine": Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_patrimoine.xlsx",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extract_save(pdf_file, dest, save=[]):
|
||||||
"""Extract charge and locataire for pdf_file and put xlsx file in dest"""
|
"""Extract charge and locataire for pdf_file and put xlsx file in dest"""
|
||||||
pdf_file = Path(pdf_file)
|
pdf_file = Path(pdf_file)
|
||||||
xls_charge = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_charge.xlsx"
|
xlss = extract_plan(pdf_file, dest)
|
||||||
xls_locataire = Path(dest) / f"{pdf_file.stem.replace(' ', '_')}_locataire.xlsx"
|
|
||||||
|
|
||||||
df_charge, df_loc = from_pdf(pdf_file)
|
if save != []:
|
||||||
|
dfs = from_pdf(pdf_file)
|
||||||
|
|
||||||
df_charge.to_excel(xls_charge, sheet_name="Charges", index=False)
|
for s in save:
|
||||||
logging.info(f"{xls_charge} saved")
|
dfs[s].to_excel(xlss[s], sheet_name=s, index=False)
|
||||||
df_loc.to_excel(xls_locataire, sheet_name="Location", index=False)
|
logging.info(f"{xlss[s]} saved")
|
||||||
logging.info(f"{xls_locataire} saved")
|
return {k: v for k, v in xlss.items() if k in save}
|
||||||
|
|
||||||
|
return xlss
|
||||||
|
@ -1,9 +1,16 @@
|
|||||||
import re
|
import re
|
||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
RECAPITULATIF_DES_OPERATIONS = 1
|
HEADER_CHARGE = [
|
||||||
|
"",
|
||||||
|
"RECAPITULATIF DES OPERATIONS",
|
||||||
|
"Débits",
|
||||||
|
"Crédits",
|
||||||
|
"Dont T.V.A.",
|
||||||
|
"Locatif",
|
||||||
|
"Déductible",
|
||||||
|
]
|
||||||
DF_TYPES = {
|
DF_TYPES = {
|
||||||
"Fournisseur": str,
|
"Fournisseur": str,
|
||||||
"RECAPITULATIF DES OPERATIONS": str,
|
"RECAPITULATIF DES OPERATIONS": str,
|
||||||
@ -17,7 +24,30 @@ DF_TYPES = {
|
|||||||
"annee": str,
|
"annee": str,
|
||||||
"lot": str,
|
"lot": str,
|
||||||
}
|
}
|
||||||
DEFAULT_FOURNISSEUR = "ROSIER MODICA MOTTEROZ SA"
|
|
||||||
|
|
||||||
|
class Line(BaseModel):
|
||||||
|
mois: int
|
||||||
|
annee: int
|
||||||
|
immeuble: str
|
||||||
|
lot: str
|
||||||
|
Champs: str
|
||||||
|
Categorie: str
|
||||||
|
Fournisseur: str
|
||||||
|
Libellé: str
|
||||||
|
Débit: float
|
||||||
|
Crédits: float
|
||||||
|
Dont_TVA: float
|
||||||
|
Locatif: float
|
||||||
|
Déductible: float
|
||||||
|
|
||||||
|
@field_validator(
|
||||||
|
"Débit", "Crédits", "Dont_TVA", "Locatif", "Déductible", mode="before"
|
||||||
|
)
|
||||||
|
def set_default_if_empty(cls, v):
|
||||||
|
if v == "":
|
||||||
|
return 0
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
def is_it(page_text):
|
def is_it(page_text):
|
||||||
@ -41,51 +71,54 @@ def get_lot(txt):
|
|||||||
return "*"
|
return "*"
|
||||||
|
|
||||||
|
|
||||||
def keep_row(row):
|
def fsm():
|
||||||
return not any(
|
current_state = "total"
|
||||||
[
|
row = {}
|
||||||
word.lower() in row[RECAPITULATIF_DES_OPERATIONS].lower()
|
line = yield
|
||||||
for word in ["TOTAL", "TOTAUX", "Solde créditeur", "Solde débiteur"]
|
while True:
|
||||||
]
|
if line == HEADER_CHARGE:
|
||||||
|
line = yield
|
||||||
|
if current_state == "total":
|
||||||
|
if line[1].lower().split(" ")[0] in ["total", "totaux"]:
|
||||||
|
current_state = "new_champs"
|
||||||
|
line = yield
|
||||||
|
elif current_state == "new_champs":
|
||||||
|
if line[0] != "":
|
||||||
|
current_state = "new_cat_line"
|
||||||
|
row = {"Champs": line[0], "Categorie": "", "Fournisseur": ""}
|
||||||
|
line = yield
|
||||||
|
elif current_state == "new_cat_line":
|
||||||
|
if line[1].lower().split(" ")[0] in ["total", "totaux"]:
|
||||||
|
current_state = "new_champs"
|
||||||
|
line = yield
|
||||||
|
row = {}
|
||||||
|
elif line[2] != "" or line[3] != "":
|
||||||
|
row.update(
|
||||||
|
{
|
||||||
|
"Fournisseur": line[0] if line[0] != "" else row["Fournisseur"],
|
||||||
|
"Libellé": line[1],
|
||||||
|
"lot": get_lot(line[1]),
|
||||||
|
"Débit": line[2],
|
||||||
|
"Crédits": line[3],
|
||||||
|
"Dont_TVA": line[4],
|
||||||
|
"Locatif": line[5],
|
||||||
|
"Déductible": line[6],
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
line = yield row
|
||||||
|
row = {
|
||||||
def extract(table, additionnal_fields: dict = {}):
|
"Champs": row["Champs"],
|
||||||
"""Turn table to dictionary with additional fields"""
|
"Categorie": row["Categorie"],
|
||||||
extracted = []
|
"Fournisseur": row["Fournisseur"],
|
||||||
header = table[0]
|
}
|
||||||
for row in table[1:]:
|
elif line[0] != "" and line[1] == "":
|
||||||
if keep_row(row):
|
row.update({"Categorie": line[0]})
|
||||||
r = dict()
|
line = yield
|
||||||
for i, value in enumerate(row):
|
elif line[1] != "":
|
||||||
if header[i] == "":
|
row.update({"Categorie": line[1]})
|
||||||
r["Fournisseur"] = value
|
line = yield
|
||||||
|
elif line[0] != "":
|
||||||
|
row.update({"Fournisseur": line[0]})
|
||||||
|
line = yield
|
||||||
else:
|
else:
|
||||||
r[header[i]] = value
|
line = yield
|
||||||
|
|
||||||
for k, v in additionnal_fields.items():
|
|
||||||
r[k] = v
|
|
||||||
|
|
||||||
if "honoraire" in row[RECAPITULATIF_DES_OPERATIONS].lower():
|
|
||||||
r["Fournisseur"] = DEFAULT_FOURNISSEUR
|
|
||||||
|
|
||||||
extracted.append(r)
|
|
||||||
|
|
||||||
return extracted
|
|
||||||
|
|
||||||
|
|
||||||
def table2df(tables):
|
|
||||||
dfs = []
|
|
||||||
for table in tables:
|
|
||||||
df = (
|
|
||||||
pd.DataFrame.from_records(table)
|
|
||||||
.replace("", np.nan)
|
|
||||||
.dropna(subset=["Débits", "Crédits"], how="all")
|
|
||||||
)
|
|
||||||
df["Fournisseur"] = df["Fournisseur"].fillna(method="ffill")
|
|
||||||
dfs.append(df)
|
|
||||||
df = pd.concat(dfs)
|
|
||||||
|
|
||||||
df["immeuble"] = df["immeuble"].apply(lambda x: x[0].capitalize())
|
|
||||||
df["lot"] = df["RECAPITULATIF DES OPERATIONS"].apply(get_lot)
|
|
||||||
return df.astype(DF_TYPES)
|
|
||||||
|
@ -1,22 +1,48 @@
|
|||||||
import numpy as np
|
from pydantic import BaseModel, field_validator
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
DF_TYPES = {
|
HEADER_LOC = [
|
||||||
"Locataires": str,
|
"Locataires",
|
||||||
"Période": str,
|
"Période",
|
||||||
"Loyers": float,
|
"Loyers",
|
||||||
"Taxes": float,
|
"Taxes",
|
||||||
"Provisions": float,
|
"Provisions",
|
||||||
"Divers": str,
|
"Divers",
|
||||||
"Total": float,
|
"",
|
||||||
"Réglés": float,
|
"Total",
|
||||||
"Impayés": float,
|
"Réglés",
|
||||||
"immeuble": str,
|
"Impayés",
|
||||||
"mois": str,
|
]
|
||||||
"annee": str,
|
|
||||||
"Lot": str,
|
|
||||||
"Type": str,
|
class Line(BaseModel):
|
||||||
}
|
mois: int
|
||||||
|
annee: int
|
||||||
|
immeuble: str
|
||||||
|
Lot: str
|
||||||
|
Type: str
|
||||||
|
Locataire: str
|
||||||
|
Loyers: float
|
||||||
|
Taxes: float
|
||||||
|
Provisions: float
|
||||||
|
Divers: float
|
||||||
|
Total: float
|
||||||
|
Réglés: float
|
||||||
|
Impayés: float
|
||||||
|
|
||||||
|
@field_validator(
|
||||||
|
"Loyers",
|
||||||
|
"Taxes",
|
||||||
|
"Provisions",
|
||||||
|
"Divers",
|
||||||
|
"Total",
|
||||||
|
"Réglés",
|
||||||
|
"Impayés",
|
||||||
|
mode="before",
|
||||||
|
)
|
||||||
|
def set_default_if_empty(cls, v):
|
||||||
|
if v == "":
|
||||||
|
return 0
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
def is_it(page_text):
|
def is_it(page_text):
|
||||||
@ -25,142 +51,43 @@ def is_it(page_text):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_drop(row):
|
|
||||||
if "totaux" in row[0].lower():
|
|
||||||
return True
|
|
||||||
if not any(row):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def extract(table, additionnal_fields: dict = {}):
|
|
||||||
"""Turn table to dictionary with additional fields"""
|
|
||||||
extracted = []
|
|
||||||
header = table[0]
|
|
||||||
for row in table[1:]:
|
|
||||||
if not is_drop(row):
|
|
||||||
r = dict()
|
|
||||||
for i, value in enumerate(row):
|
|
||||||
if header[i] != "":
|
|
||||||
r[header[i]] = value
|
|
||||||
for k, v in additionnal_fields.items():
|
|
||||||
r[k] = v
|
|
||||||
extracted.append(r)
|
|
||||||
return extracted
|
|
||||||
|
|
||||||
|
|
||||||
def join_row(last, next):
|
|
||||||
row = {}
|
|
||||||
for key in last:
|
|
||||||
if last[key] == next[key]:
|
|
||||||
row[key] = last[key]
|
|
||||||
elif last[key] and next[key]:
|
|
||||||
row[key] = f"{last[key]}\n{next[key]}"
|
|
||||||
elif last[key]:
|
|
||||||
row[key] = last[key]
|
|
||||||
elif next[key]:
|
|
||||||
row[key] = next[key]
|
|
||||||
else:
|
|
||||||
row[key] = ""
|
|
||||||
return row
|
|
||||||
|
|
||||||
|
|
||||||
def join_tables(tables):
|
|
||||||
joined = tables[0]
|
|
||||||
|
|
||||||
for t in tables[1:]:
|
|
||||||
last_row = joined[-1]
|
|
||||||
if "totaux" not in last_row["Locataires"].lower():
|
|
||||||
first_row = t[0]
|
|
||||||
joined_row = join_row(last_row, first_row)
|
|
||||||
joined = joined[:-1] + [joined_row] + t[1:]
|
|
||||||
else:
|
|
||||||
joined += t
|
|
||||||
|
|
||||||
return joined
|
|
||||||
|
|
||||||
|
|
||||||
def parse_lot(string):
|
def parse_lot(string):
|
||||||
words = string.split(" ")
|
words = string.split(" ")
|
||||||
return {"Lot": "{:02d}".format(int(words[1])), "Type": " ".join(words[2:])}
|
return {"Lot": "{:02d}".format(int(words[1])), "Type": " ".join(words[2:])}
|
||||||
|
|
||||||
|
|
||||||
def clean_type(string):
|
def fsm():
|
||||||
if "appartement" in string.lower():
|
current_state = "new_row"
|
||||||
return string[-2:]
|
row = {}
|
||||||
return string
|
line = yield
|
||||||
|
while True:
|
||||||
|
if line == HEADER_LOC:
|
||||||
def join_row(table):
|
line = yield
|
||||||
joined = []
|
elif current_state == "new_row":
|
||||||
for row in table:
|
if line[0] != "" and line[0] != "TOTAUX":
|
||||||
if row["Locataires"].startswith("Lot"):
|
row.update(parse_lot(line[0]))
|
||||||
row.update(parse_lot(row["Locataires"]))
|
current_state = "add_loc"
|
||||||
row["Locataires"] = ""
|
line = yield
|
||||||
joined.append(row)
|
elif current_state == "add_loc":
|
||||||
elif row["Locataires"] == "Rappel de Loyer":
|
if line[0] != "":
|
||||||
last_row = joined[-1]
|
row["Locataire"] = line[0]
|
||||||
|
current_state = "add_totaux"
|
||||||
|
line = yield
|
||||||
|
elif current_state == "add_totaux":
|
||||||
|
if line[0] == "Totaux":
|
||||||
row.update(
|
row.update(
|
||||||
{
|
{
|
||||||
"Lot": last_row["Lot"],
|
"Loyers": line[2],
|
||||||
"Type": last_row["Type"],
|
"Taxes": line[3],
|
||||||
"Locataires": last_row["Locataires"],
|
"Provisions": line[4],
|
||||||
"Divers": "Rappel de Loyer",
|
"Divers": line[5],
|
||||||
|
"Total": line[7],
|
||||||
|
"Réglés": line[8],
|
||||||
|
"Impayés": line[9],
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
joined.append(row)
|
line = yield row
|
||||||
|
row = {}
|
||||||
elif row["Locataires"]:
|
current_state = "new_row"
|
||||||
last_row = joined.pop()
|
|
||||||
row_name = row["Locataires"].replace("\n", " ")
|
|
||||||
row.update({k: v for k, v in last_row.items() if v})
|
|
||||||
row["Locataires"] = last_row["Locataires"] + " " + row_name
|
|
||||||
joined.append(row)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if row["Période"].startswith("Solde"):
|
line = yield
|
||||||
last_row = joined.pop()
|
|
||||||
row.update(
|
|
||||||
{
|
|
||||||
"Lot": last_row["Lot"],
|
|
||||||
"Type": last_row["Type"],
|
|
||||||
"Locataires": last_row["Locataires"],
|
|
||||||
}
|
|
||||||
)
|
|
||||||
joined.append(row)
|
|
||||||
|
|
||||||
elif row["Période"].startswith("Du"):
|
|
||||||
last_row = joined[-1]
|
|
||||||
row.update(
|
|
||||||
{
|
|
||||||
"Lot": last_row["Lot"],
|
|
||||||
"Type": last_row["Type"],
|
|
||||||
"Locataires": last_row["Locataires"],
|
|
||||||
}
|
|
||||||
)
|
|
||||||
joined.append(row)
|
|
||||||
|
|
||||||
return joined
|
|
||||||
|
|
||||||
|
|
||||||
def flat_tables(tables):
|
|
||||||
tables_flat = []
|
|
||||||
for table in tables:
|
|
||||||
tables_flat.extend(table)
|
|
||||||
return tables_flat
|
|
||||||
|
|
||||||
|
|
||||||
def table2df(tables):
|
|
||||||
tables = flat_tables(tables)
|
|
||||||
joined = join_row(tables)
|
|
||||||
df = pd.DataFrame.from_records(joined)
|
|
||||||
|
|
||||||
df["immeuble"] = df["immeuble"].apply(lambda x: x[0].capitalize())
|
|
||||||
df["Type"] = df["Type"].apply(clean_type)
|
|
||||||
|
|
||||||
numeric_cols = [k for k, v in DF_TYPES.items() if v == float]
|
|
||||||
df[numeric_cols] = df[numeric_cols].replace("", np.nan)
|
|
||||||
|
|
||||||
df = df.drop(df[(df["Locataires"] == "") & (df["Période"] == "")].index)
|
|
||||||
|
|
||||||
return df.astype(DF_TYPES)
|
|
||||||
|
@ -1,4 +1,74 @@
|
|||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
|
||||||
|
HEADER_PATRIMOINE = [
|
||||||
|
"Etage",
|
||||||
|
"Lots",
|
||||||
|
"Type de lot",
|
||||||
|
"Nom du Locataire",
|
||||||
|
"Loyer Annuel",
|
||||||
|
"Début Bail",
|
||||||
|
"Fin Bail",
|
||||||
|
"Entrée",
|
||||||
|
"Départ",
|
||||||
|
"Révisé le",
|
||||||
|
"U",
|
||||||
|
"Dépôt Gar.",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class Line(BaseModel):
|
||||||
|
mois: int
|
||||||
|
annee: int
|
||||||
|
immeuble: str
|
||||||
|
Etage: str
|
||||||
|
Lot: str
|
||||||
|
Type: str
|
||||||
|
Locataire: str
|
||||||
|
Loyer_annuel: int
|
||||||
|
Debut_bail: str
|
||||||
|
Fin_bail: str
|
||||||
|
Entree: str
|
||||||
|
Depart: str
|
||||||
|
Revision_bail: str
|
||||||
|
Usage: str
|
||||||
|
Depot_garantie: float
|
||||||
|
|
||||||
|
@field_validator("Loyer_annuel", "Depot_garantie", mode="before")
|
||||||
|
def set_default_if_empty(cls, v):
|
||||||
|
if v == "":
|
||||||
|
return 0
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
def is_it(page_text):
|
def is_it(page_text):
|
||||||
if "VOTRE PATRIMOINE" in page_text:
|
if "VOTRE PATRIMOINE" in page_text:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def fsm():
|
||||||
|
current_state = "new_line"
|
||||||
|
row = {}
|
||||||
|
line = yield
|
||||||
|
while True:
|
||||||
|
if line == HEADER_PATRIMOINE:
|
||||||
|
line = yield
|
||||||
|
if current_state == "new_line":
|
||||||
|
if line[0] != "":
|
||||||
|
row = {
|
||||||
|
"Etage": line[0],
|
||||||
|
"Lot": line[1][-2:] if line[1] != "" else row["Lot"],
|
||||||
|
"Type": line[2] if line[2] != "" else row["Type"],
|
||||||
|
"Locataire": line[3],
|
||||||
|
"Loyer_annuel": line[4].replace(" ", ""),
|
||||||
|
"Debut_bail": line[5],
|
||||||
|
"Fin_bail": line[6],
|
||||||
|
"Entree": line[7],
|
||||||
|
"Depart": line[8],
|
||||||
|
"Revision_bail": line[9],
|
||||||
|
"Usage": line[10],
|
||||||
|
"Depot_garantie": line[11].replace(" ", ""),
|
||||||
|
}
|
||||||
|
line = yield row
|
||||||
|
else:
|
||||||
|
line = yield
|
||||||
|
@ -4,7 +4,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import click
|
import click
|
||||||
|
|
||||||
from .extract import extract_save
|
from .extract import extract_save, extract_plan
|
||||||
from .join import join_excel
|
from .join import join_excel
|
||||||
|
|
||||||
|
|
||||||
@ -51,18 +51,45 @@ def on(pdf_file, dest):
|
|||||||
|
|
||||||
|
|
||||||
@extract.command()
|
@extract.command()
|
||||||
@click.option("--src", help="Tous les fichiers dans folder", default="./")
|
@click.option(
|
||||||
|
"--src", help="Tous les fichiers dans folder (de façon récursive)", default="./"
|
||||||
|
)
|
||||||
@click.option("--dest", help="Où mettre les fichiers produits", default="./")
|
@click.option("--dest", help="Où mettre les fichiers produits", default="./")
|
||||||
def all(src, dest):
|
@click.option(
|
||||||
p = Path(src)
|
"--only-plan",
|
||||||
|
help="Ne produit rien mais indique les changements",
|
||||||
|
default=False,
|
||||||
|
is_flag=True,
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--force",
|
||||||
|
help="Écrase les fichiers produits précédemment",
|
||||||
|
default=False,
|
||||||
|
is_flag=True,
|
||||||
|
)
|
||||||
|
def all(src, dest, force, only_plan):
|
||||||
|
src_path = Path(src)
|
||||||
|
|
||||||
d = Path(dest)
|
dest = Path(dest)
|
||||||
d.mkdir(exist_ok=True)
|
dest.mkdir(exist_ok=True)
|
||||||
|
|
||||||
pdf_files = [x for x in p.iterdir() if ".pdf" in str(x)]
|
for pdf_file in src_path.rglob("**/*.pdf"):
|
||||||
for pdf_file in pdf_files:
|
relative_path = pdf_file.relative_to(src_path)
|
||||||
|
files_dest = dest / relative_path.parent
|
||||||
logging.info(f"Found {pdf_file}")
|
logging.info(f"Found {pdf_file}")
|
||||||
extract_save(pdf_file, d)
|
|
||||||
|
plan_dest = extract_plan(pdf_file, files_dest)
|
||||||
|
save = []
|
||||||
|
for k, p in plan_dest.items():
|
||||||
|
if not p.exists() or force:
|
||||||
|
save.append(k)
|
||||||
|
|
||||||
|
if only_plan:
|
||||||
|
for s in save:
|
||||||
|
logging.info(f"Planing to create {plan_dest[s]}")
|
||||||
|
else:
|
||||||
|
files_dest.mkdir(parents=True, exist_ok=True)
|
||||||
|
extract_save(pdf_file, files_dest, save)
|
||||||
|
|
||||||
|
|
||||||
@main.command()
|
@main.command()
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
pdfplumber
|
pdfplumber
|
||||||
numpy
|
numpy
|
||||||
pandas
|
pandas
|
||||||
|
click
|
||||||
|
openpyxl
|
||||||
|
Loading…
Reference in New Issue
Block a user