diff --git a/app.py b/app.py index 845b759..e57a5d4 100644 --- a/app.py +++ b/app.py @@ -130,171 +130,77 @@ file_handler.setFormatter(logging.Formatter( # return r -import re -from typing import List, Dict, Optional - class AHKParser: lab = "AHK" - # ---------- Helpers ---------- - def _norm(self, text: str) -> str: - # Normalise espaces/entités, supprime artefacts typiques d'OCR - t = (text.replace("\u00a0", " ") - .replace(" ", " ") - .replace("**", " ") - .replace("\t", " ")) - # Supprime espaces multiples - t = re.sub(r"[ ]{2,}", " ", t) - # Aligne "Page of" etc. (inutile au parsing) - return t.strip() + def _lines(self, text): + return [l.strip() for l in text.splitlines() if l.strip()] - def _safe_search(self, pat: str, text: str, flags=0) -> Optional[str]: - m = re.search(pat, text, flags) - return m.group(1).strip() if m else None - - def _to_float(self, s: Optional[str]) -> Optional[float]: - if not s: - return None - s = s.replace(",", "").replace("kg", "").replace("%", "").strip() - # enlève éventuels espaces après le signe - s = re.sub(r"^([+\-])\s+", r"\1", s) - try: - return float(s) - except ValueError: - return None - - def _split_lines(self, text: str) -> List[str]: - lines = [l.strip() for l in re.split(r"\r?\n", text) if l.strip()] - return lines - - def _take_next_colon_values(self, lines: List[str], start_idx: int, count: int) -> List[str]: - """ - Récupère, à partir de start_idx (exclu), les 'count' prochaines valeurs qui suivent un ':'. - Tolère plusieurs valeurs sur la même ligne: ex ': A : B : C' - """ + def _col_block(self, lines, labels, max_scan=25): + # trouve la dernière ligne du bloc de labels + last = max(i for i,l in enumerate(lines) if l in labels) vals = [] - j = start_idx + 1 - while j < len(lines) and len(vals) < count: - # attrape toutes les occurrences sur la ligne - parts = re.findall(r":\s*([^:]+?)(?=\s*(?::|$))", lines[j]) - for v in parts: - if len(vals) < count: - vals.append(v.strip()) - j += 1 - return vals - - def _extract_group_by_headers(self, text: str, headers: List[str], anchor_regex: Optional[str]=None) -> Dict[str, str]: - """ - Trouve une ligne contenant tous les headers (dans l'ordre) OU une ancre fournie, - puis mappe les N valeurs suivantes (débutant par ':') aux headers. - """ - lines = self._split_lines(self._norm(text)) - # construire regex qui force l'ordre des headers - hdr_regex = r"\b" + r"\s+".join([re.escape(h) for h in headers]) + r"\b" - start_idx = None - for i, l in enumerate(lines): - if anchor_regex and re.search(anchor_regex, l, flags=re.I): - start_idx = i + for l in lines[last+1:last+1+max_scan]: + if l.startswith(":"): + vals.append(l[1:].strip()) + if len(vals) == len(labels): break - if re.search(hdr_regex, l): - start_idx = i - break - if start_idx is None: - return {} - values = self._take_next_colon_values(lines, start_idx, len(headers)) - return {h: (values[idx] if idx < len(values) else None) for idx, h in enumerate(headers)} + return dict(zip(labels, vals)) - # ---------- API compatibles avec ton code ---------- - def extract_table(self, text: str, headers: List[str]) -> Dict[str, str]: - # version robuste: détecte headers groupés et prend les valeurs en séquence - return self._extract_group_by_headers(text, headers) - - def extract_weights(self, text: str, anchor: Optional[str]=None) -> Dict[str, str]: - """ - Extrait un bloc de poids Bales/Gross/Tare/Net. - - Si anchor est défini (ex. 'Bales Weighed'), on part de cette ancre. - - Sinon on cherche la ligne d'en-têtes 'Bales Gross Tare Net'. - """ - headers = ["Bales", "Gross", "Tare", "Net"] - block = self._extract_group_by_headers(text, headers, - anchor_regex=anchor if anchor else None) - # nettoyage des unités pour les poids - clean = {} - for k, v in block.items(): - if v is None: - clean[k] = None - else: - clean[k] = v.replace("kg", "").strip() - return clean - - # ---------- Parse principal ---------- - def parse(self, text: str) -> dict: - # si tu as déjà empty_weight_report(), réutilise-le - r = { - "report": {}, - "contract": {}, - "parties": {}, - "shipment": {}, - "weights": {} - } - - T = self._norm(text) + def parse(self, text): + L = self._lines(text) + r = empty_weight_report("AHK") # report - # Exemple PDF: "AHK S/790329/161112/PK" (il y a un espace après AHK) - r["report"]["reference"] = self._safe_search(r"(AHK\s+[A-Z0-9/]+)", T) - r["report"]["date"] = self._safe_search(r"Produced On\s*([0-9A-Za-z ]+)", T) + r["report"]["reference"] = safe_search(r"(AHK\s*/[A-Z0-9/]+)", text) + r["report"]["date"] = safe_search(r"Produced On\s*([0-9A-Za-z ]+)", text) - # Order details: "Client Client Ref No. Buyer" puis valeurs - order = self.extract_table(T, ["Client", "Client Ref No.", "Buyer"]) - r["contract"]["invoice_no"] = order.get("Client Ref No.") or \ - self._safe_search(r"Client Reference:\s*([A-Z0-9\- /]+)", T) - r["parties"]["client"] = order.get("Client") - r["parties"]["buyer"] = order.get("Buyer") + # contract + r["contract"]["invoice_no"] = safe_search(r"Client Reference:\s*([A-Z0-9\- /]+)", text) + r["contract"]["commodity"] = "Raw Cotton" - # Infos expédition (2 blocs groupés) - ship = self.extract_table(T, ["Total Bales","Vessel","Voy. No.","B/L No.","B/L Date","Destination"]) - ship2 = self.extract_table(T, ["Growth","Arrival Date","First date of weighing", - "Last Date of Weighing","Weighing method","Tare"]) + # buyer + r["parties"]["buyer"] = safe_search(r"Buyer:\s*([A-Z0-9 ().,-]+)", text) - r["shipment"]["bales"] = self._to_float(ship.get("Total Bales")) - r["shipment"]["vessel"] = ship.get("Vessel") - r["shipment"]["voyage_no"] = ship.get("Voy. No.") - r["shipment"]["bl_no"] = ship.get("B/L No.") - r["shipment"]["bl_date"] = ship.get("B/L Date") - r["shipment"]["port_destination"] = ship.get("Destination") + # shipment block 1 + ship1 = self._col_block(L, [ + "Total Bales","Vessel","Voy. No.","B/L No.","B/L Date","Destination" + ]) - r["contract"]["origin"] = ship2.get("Growth") - r["shipment"]["arrival_date"] = ship2.get("Arrival Date") - r["shipment"]["first_weighing_date"] = ship2.get("First date of weighing") - r["shipment"]["last_weighing_date"] = ship2.get("Last Date of Weighing") - r["shipment"]["weighing_method"] = ship2.get("Weighing method") - # Chez AHK, "Tare: Invoice" indique la base de tare, pas un poids - r["shipment"]["tare_basis"] = ship2.get("Tare") + # shipment block 2 + ship2 = self._col_block(L, [ + "Growth","Arrival Date","First date of weighing", + "Last Date of Weighing","Weighing method","Tare" + ]) - # Poids - # Bloc 1: invoice (juste après l'en-tête 'Bales Gross Tare Net') - inv = self.extract_weights(T) # sans ancre -> la 1ère occurrence - # Bloc 2: landed (ancré sur 'Bales Weighed') - land = self.extract_weights(T, anchor=r"\bBales Weighed\b") + r["shipment"]["bales"] = to_float(ship1.get("Total Bales")) + r["shipment"]["vessel"] = ship1.get("Vessel") + r["shipment"]["bl_no"] = ship1.get("B/L No.") + r["shipment"]["port_destination"] = ship1.get("Destination") + r["shipment"]["arrival_date"] = ship2.get("Arrival Date") + r["shipment"]["weighing_method"] = ship2.get("Weighing method") + r["contract"]["origin"] = ship2.get("Growth") - r["weights"]["invoice_bales"] = self._to_float(inv.get("Bales")) - r["weights"]["invoice_gross_kg"] = self._to_float(inv.get("Gross")) - r["weights"]["invoice_tare_kg"] = self._to_float(inv.get("Tare")) - r["weights"]["invoice_net_kg"] = self._to_float(inv.get("Net")) + # invoice weights + inv = self._col_block(L, ["Bales","Gross","Tare","Net"]) + r["weights"]["invoice_net_kg"] = to_float(inv.get("Net")) - r["weights"]["landed_bales"] = self._to_float(land.get("Bales")) - r["weights"]["gross_landed_kg"] = self._to_float(land.get("Gross")) - r["weights"]["tare_kg"] = self._to_float(land.get("Tare")) - r["weights"]["net_landed_kg"] = self._to_float(land.get("Net")) + # landed weights + land = self._col_block(self._lines(section(text,"Bales Weighed","Outturn")), + ["Bales","Gross","Tare","Net"]) - # Loss / Outturn - loss_sec = T # si tu as section(text, "LOSS", "Invoice average"), remplace par ta fonction - r["weights"]["gain_loss_kg"] = self._to_float(self._safe_search(r"LOSS.*?(-?\s*\d+\.?\d*)\s*kg", loss_sec, flags=re.S)) - r["weights"]["gain_loss_percent"] = self._to_float(self._safe_search(r"Percentage\s*:\s*([\-+]?\s*\d+\.?\d*)", loss_sec)) + r["weights"]["gross_landed_kg"] = to_float(land.get("Gross")) + r["weights"]["tare_kg"] = to_float(land.get("Tare")) + r["weights"]["net_landed_kg"] = to_float(land.get("Net")) + + # loss + loss = section(text,"LOSS","Invoice average") + r["weights"]["gain_loss_kg"] = to_float(safe_search(r"(-?\d+\.?\d*)\s*kg", loss)) + r["weights"]["gain_loss_percent"] = to_float(safe_search(r"Percentage\s*:\s*(-?\d+\.?\d*)", loss)) return r + class IntertekParser: lab="INTERTEK" def parse(self,text): @@ -644,8 +550,7 @@ async def ocr(file: UploadFile): return { "success": True, - # "ocr_text": ocr_text[:1000] + "..." if len(ocr_text) > 1000 else ocr_text, - "ocr_text": structured_data, + "ocr_text": ocr_text[:1000] + "..." if len(ocr_text) > 1000 else ocr_text, "structured_data": structured_data, "json": json.dumps(structured_data, indent=2, ensure_ascii=False) }