This commit is contained in:
2026-01-11 18:12:06 +01:00
parent 8da312fd33
commit 131aa7a112

199
app.py
View File

@@ -130,171 +130,77 @@ file_handler.setFormatter(logging.Formatter(
# return r
import re
from typing import List, Dict, Optional
class AHKParser:
lab = "AHK"
# ---------- Helpers ----------
def _norm(self, text: str) -> str:
# Normalise espaces/entités, supprime artefacts typiques d'OCR
t = (text.replace("\u00a0", " ")
.replace(" ", " ")
.replace("**", " ")
.replace("\t", " "))
# Supprime espaces multiples
t = re.sub(r"[ ]{2,}", " ", t)
# Aligne "Page of" etc. (inutile au parsing)
return t.strip()
def _lines(self, text):
return [l.strip() for l in text.splitlines() if l.strip()]
def _safe_search(self, pat: str, text: str, flags=0) -> Optional[str]:
m = re.search(pat, text, flags)
return m.group(1).strip() if m else None
def _to_float(self, s: Optional[str]) -> Optional[float]:
if not s:
return None
s = s.replace(",", "").replace("kg", "").replace("%", "").strip()
# enlève éventuels espaces après le signe
s = re.sub(r"^([+\-])\s+", r"\1", s)
try:
return float(s)
except ValueError:
return None
def _split_lines(self, text: str) -> List[str]:
lines = [l.strip() for l in re.split(r"\r?\n", text) if l.strip()]
return lines
def _take_next_colon_values(self, lines: List[str], start_idx: int, count: int) -> List[str]:
"""
Récupère, à partir de start_idx (exclu), les 'count' prochaines valeurs qui suivent un ':'.
Tolère plusieurs valeurs sur la même ligne: ex ': A : B : C'
"""
def _col_block(self, lines, labels, max_scan=25):
# trouve la dernière ligne du bloc de labels
last = max(i for i,l in enumerate(lines) if l in labels)
vals = []
j = start_idx + 1
while j < len(lines) and len(vals) < count:
# attrape toutes les occurrences sur la ligne
parts = re.findall(r":\s*([^:]+?)(?=\s*(?::|$))", lines[j])
for v in parts:
if len(vals) < count:
vals.append(v.strip())
j += 1
return vals
def _extract_group_by_headers(self, text: str, headers: List[str], anchor_regex: Optional[str]=None) -> Dict[str, str]:
"""
Trouve une ligne contenant tous les headers (dans l'ordre) OU une ancre fournie,
puis mappe les N valeurs suivantes (débutant par ':') aux headers.
"""
lines = self._split_lines(self._norm(text))
# construire regex qui force l'ordre des headers
hdr_regex = r"\b" + r"\s+".join([re.escape(h) for h in headers]) + r"\b"
start_idx = None
for i, l in enumerate(lines):
if anchor_regex and re.search(anchor_regex, l, flags=re.I):
start_idx = i
for l in lines[last+1:last+1+max_scan]:
if l.startswith(":"):
vals.append(l[1:].strip())
if len(vals) == len(labels):
break
if re.search(hdr_regex, l):
start_idx = i
break
if start_idx is None:
return {}
values = self._take_next_colon_values(lines, start_idx, len(headers))
return {h: (values[idx] if idx < len(values) else None) for idx, h in enumerate(headers)}
return dict(zip(labels, vals))
# ---------- API compatibles avec ton code ----------
def extract_table(self, text: str, headers: List[str]) -> Dict[str, str]:
# version robuste: détecte headers groupés et prend les valeurs en séquence
return self._extract_group_by_headers(text, headers)
def extract_weights(self, text: str, anchor: Optional[str]=None) -> Dict[str, str]:
"""
Extrait un bloc de poids Bales/Gross/Tare/Net.
- Si anchor est défini (ex. 'Bales Weighed'), on part de cette ancre.
- Sinon on cherche la ligne d'en-têtes 'Bales Gross Tare Net'.
"""
headers = ["Bales", "Gross", "Tare", "Net"]
block = self._extract_group_by_headers(text, headers,
anchor_regex=anchor if anchor else None)
# nettoyage des unités pour les poids
clean = {}
for k, v in block.items():
if v is None:
clean[k] = None
else:
clean[k] = v.replace("kg", "").strip()
return clean
# ---------- Parse principal ----------
def parse(self, text: str) -> dict:
# si tu as déjà empty_weight_report(), réutilise-le
r = {
"report": {},
"contract": {},
"parties": {},
"shipment": {},
"weights": {}
}
T = self._norm(text)
def parse(self, text):
L = self._lines(text)
r = empty_weight_report("AHK")
# report
# Exemple PDF: "AHK S/790329/161112/PK" (il y a un espace après AHK)
r["report"]["reference"] = self._safe_search(r"(AHK\s+[A-Z0-9/]+)", T)
r["report"]["date"] = self._safe_search(r"Produced On\s*([0-9A-Za-z ]+)", T)
r["report"]["reference"] = safe_search(r"(AHK\s*/[A-Z0-9/]+)", text)
r["report"]["date"] = safe_search(r"Produced On\s*([0-9A-Za-z ]+)", text)
# Order details: "Client Client Ref No. Buyer" puis valeurs
order = self.extract_table(T, ["Client", "Client Ref No.", "Buyer"])
r["contract"]["invoice_no"] = order.get("Client Ref No.") or \
self._safe_search(r"Client Reference:\s*([A-Z0-9\- /]+)", T)
r["parties"]["client"] = order.get("Client")
r["parties"]["buyer"] = order.get("Buyer")
# contract
r["contract"]["invoice_no"] = safe_search(r"Client Reference:\s*([A-Z0-9\- /]+)", text)
r["contract"]["commodity"] = "Raw Cotton"
# Infos expédition (2 blocs groupés)
ship = self.extract_table(T, ["Total Bales","Vessel","Voy. No.","B/L No.","B/L Date","Destination"])
ship2 = self.extract_table(T, ["Growth","Arrival Date","First date of weighing",
"Last Date of Weighing","Weighing method","Tare"])
# buyer
r["parties"]["buyer"] = safe_search(r"Buyer:\s*([A-Z0-9 ().,-]+)", text)
r["shipment"]["bales"] = self._to_float(ship.get("Total Bales"))
r["shipment"]["vessel"] = ship.get("Vessel")
r["shipment"]["voyage_no"] = ship.get("Voy. No.")
r["shipment"]["bl_no"] = ship.get("B/L No.")
r["shipment"]["bl_date"] = ship.get("B/L Date")
r["shipment"]["port_destination"] = ship.get("Destination")
# shipment block 1
ship1 = self._col_block(L, [
"Total Bales","Vessel","Voy. No.","B/L No.","B/L Date","Destination"
])
r["contract"]["origin"] = ship2.get("Growth")
r["shipment"]["arrival_date"] = ship2.get("Arrival Date")
r["shipment"]["first_weighing_date"] = ship2.get("First date of weighing")
r["shipment"]["last_weighing_date"] = ship2.get("Last Date of Weighing")
r["shipment"]["weighing_method"] = ship2.get("Weighing method")
# Chez AHK, "Tare: Invoice" indique la base de tare, pas un poids
r["shipment"]["tare_basis"] = ship2.get("Tare")
# shipment block 2
ship2 = self._col_block(L, [
"Growth","Arrival Date","First date of weighing",
"Last Date of Weighing","Weighing method","Tare"
])
# Poids
# Bloc 1: invoice (juste après l'en-tête 'Bales Gross Tare Net')
inv = self.extract_weights(T) # sans ancre -> la 1ère occurrence
# Bloc 2: landed (ancré sur 'Bales Weighed')
land = self.extract_weights(T, anchor=r"\bBales Weighed\b")
r["shipment"]["bales"] = to_float(ship1.get("Total Bales"))
r["shipment"]["vessel"] = ship1.get("Vessel")
r["shipment"]["bl_no"] = ship1.get("B/L No.")
r["shipment"]["port_destination"] = ship1.get("Destination")
r["shipment"]["arrival_date"] = ship2.get("Arrival Date")
r["shipment"]["weighing_method"] = ship2.get("Weighing method")
r["contract"]["origin"] = ship2.get("Growth")
r["weights"]["invoice_bales"] = self._to_float(inv.get("Bales"))
r["weights"]["invoice_gross_kg"] = self._to_float(inv.get("Gross"))
r["weights"]["invoice_tare_kg"] = self._to_float(inv.get("Tare"))
r["weights"]["invoice_net_kg"] = self._to_float(inv.get("Net"))
# invoice weights
inv = self._col_block(L, ["Bales","Gross","Tare","Net"])
r["weights"]["invoice_net_kg"] = to_float(inv.get("Net"))
r["weights"]["landed_bales"] = self._to_float(land.get("Bales"))
r["weights"]["gross_landed_kg"] = self._to_float(land.get("Gross"))
r["weights"]["tare_kg"] = self._to_float(land.get("Tare"))
r["weights"]["net_landed_kg"] = self._to_float(land.get("Net"))
# landed weights
land = self._col_block(self._lines(section(text,"Bales Weighed","Outturn")),
["Bales","Gross","Tare","Net"])
# Loss / Outturn
loss_sec = T # si tu as section(text, "LOSS", "Invoice average"), remplace par ta fonction
r["weights"]["gain_loss_kg"] = self._to_float(self._safe_search(r"LOSS.*?(-?\s*\d+\.?\d*)\s*kg", loss_sec, flags=re.S))
r["weights"]["gain_loss_percent"] = self._to_float(self._safe_search(r"Percentage\s*:\s*([\-+]?\s*\d+\.?\d*)", loss_sec))
r["weights"]["gross_landed_kg"] = to_float(land.get("Gross"))
r["weights"]["tare_kg"] = to_float(land.get("Tare"))
r["weights"]["net_landed_kg"] = to_float(land.get("Net"))
# loss
loss = section(text,"LOSS","Invoice average")
r["weights"]["gain_loss_kg"] = to_float(safe_search(r"(-?\d+\.?\d*)\s*kg", loss))
r["weights"]["gain_loss_percent"] = to_float(safe_search(r"Percentage\s*:\s*(-?\d+\.?\d*)", loss))
return r
class IntertekParser:
lab="INTERTEK"
def parse(self,text):
@@ -644,8 +550,7 @@ async def ocr(file: UploadFile):
return {
"success": True,
# "ocr_text": ocr_text[:1000] + "..." if len(ocr_text) > 1000 else ocr_text,
"ocr_text": structured_data,
"ocr_text": ocr_text[:1000] + "..." if len(ocr_text) > 1000 else ocr_text,
"structured_data": structured_data,
"json": json.dumps(structured_data, indent=2, ensure_ascii=False)
}