Files
automation-service/app.py
2026-01-11 16:57:23 +01:00

754 lines
30 KiB
Python

from fastapi import FastAPI, UploadFile, HTTPException, Body
from PIL import Image
import pytesseract
from doctr.models import ocr_predictor
from doctr.io import DocumentFile
from PyPDF2 import PdfReader
import camelot
import spacy
import logging
import io
from logging.handlers import RotatingFileHandler
import re
LOG_PATH = "/var/log/automation-service.log"
file_handler = RotatingFileHandler(
LOG_PATH,
maxBytes=10*1024*1024,
backupCount=5,
encoding="utf-8"
)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(message)s"
))
# class AHKParser:
# lab="AHK"
# def parse(self,text):
# r=empty_weight_report("AHK")
# inv=section(text,"INVOICE WEIGHTS","Bales Weighed")
# land=section(text,"Bales Weighed","Outturn")
# loss=section(text,"LOSS","Invoice average")
# r["report"]["reference"]=safe_search(r"(AHK\s*/\S+)",text)
# r["report"]["date"]=extract("Produced On",text)
# r["contract"]["invoice_no"]=extract("Client Reference",text)
# r["contract"]["origin"]=extract("Growth",text)
# r["contract"]["commodity"]="Raw Cotton"
# r["parties"]["seller"]=extract("Client",text)
# r["parties"]["buyer"]=extract("Buyer",text)
# r["shipment"]["vessel"]=extract("Vessel",text)
# r["shipment"]["bl_no"]=extract("B/L No",text)
# r["shipment"]["port_destination"]=extract("Destination",text)
# r["shipment"]["arrival_date"]=extract("Arrival Date",text)
# r["shipment"]["weighing_method"]=extract("Weighing method",text)
# r["shipment"]["bales"]=to_float(extract("Total Bales",text))
# r["weights"]["gross_landed_kg"]=to_float(extract("Gross",land))
# r["weights"]["tare_kg"]=to_float(extract("Tare",land))
# r["weights"]["net_landed_kg"]=to_float(extract("Net",land))
# r["weights"]["invoice_net_kg"]=to_float(extract("Net",inv))
# r["weights"]["gain_loss_kg"]=to_float(extract("kg",loss))
# r["weights"]["gain_loss_percent"]=to_float(extract("Percentage",loss))
# return r
# class AHKParser:
# lab = "AHK"
# def extract_table(self, text, headers):
# lines = [l.strip() for l in text.splitlines() if l.strip()]
# out = {}
# for h in headers:
# for i,l in enumerate(lines):
# if l == h:
# for j in range(i+1, i+8):
# if j < len(lines) and lines[j].startswith(":"):
# out[h] = lines[j][1:].strip()
# break
# return out
# def extract_weights(self, text):
# lines = [l.strip() for l in text.splitlines() if l.strip()]
# res = {}
# for i,l in enumerate(lines):
# if l == "Bales Weighed":
# headers = ["Bales","Gross","Tare","Net"]
# for h in headers:
# for j in range(i, i+20):
# if j < len(lines) and lines[j].startswith(":"):
# res[h] = lines[j][1:].replace("kg","").strip()
# break
# return res
# def parse(self, text):
# r = empty_weight_report("AHK")
# # report
# r["report"]["reference"] = safe_search(r"(AHK\s*/[A-Z0-9/]+)", text)
# r["report"]["date"] = safe_search(r"Produced On\s*([0-9A-Za-z ]+)", text)
# # contract
# r["contract"]["invoice_no"] = safe_search(r"Client Reference:\s*([A-Z0-9\- /]+)", text)
# r["contract"]["commodity"] = "Raw Cotton"
# # buyer
# r["parties"]["buyer"] = safe_search(r"Buyer:\s*([A-Z0-9 ().,-]+)", text)
# # shipment tables
# ship = self.extract_table(text, [
# "Total Bales","Vessel","Voy. No.","B/L No.","B/L Date","Destination"
# ])
# ship2 = self.extract_table(text, [
# "Growth","Arrival Date","First date of weighing",
# "Last Date of Weighing","Weighing method","Tare"
# ])
# r["shipment"]["bales"] = to_float(ship.get("Total Bales"))
# r["shipment"]["vessel"] = ship.get("Vessel")
# r["shipment"]["bl_no"] = ship.get("B/L No.")
# r["shipment"]["port_destination"] = ship.get("Destination")
# r["shipment"]["arrival_date"] = ship2.get("Arrival Date")
# r["shipment"]["weighing_method"] = ship2.get("Weighing method")
# r["contract"]["origin"] = ship2.get("Growth")
# # weights
# inv = self.extract_table(text, ["Bales","Gross","Tare","Net"])
# land = self.extract_weights(text)
# r["weights"]["invoice_net_kg"] = to_float(inv.get("Net"))
# r["weights"]["gross_landed_kg"] = to_float(land.get("Gross"))
# r["weights"]["tare_kg"] = to_float(land.get("Tare"))
# r["weights"]["net_landed_kg"] = to_float(land.get("Net"))
# # loss
# loss = section(text,"LOSS","Invoice average")
# r["weights"]["gain_loss_kg"] = to_float(safe_search(r"(-?\d+\.?\d*)\s*kg", loss))
# r["weights"]["gain_loss_percent"] = to_float(safe_search(r"Percentage\s*:\s*(-?\d+\.?\d*)", loss))
# return r
import re
from typing import List, Dict, Optional
class AHKParser:
lab = "AHK"
# ---------- Helpers ----------
def _norm(self, text: str) -> str:
# Normalise espaces/entités, supprime artefacts typiques d'OCR
t = (text.replace("\u00a0", " ")
.replace("&nbsp;", " ")
.replace("**", " ")
.replace("\t", " "))
# Supprime espaces multiples
t = re.sub(r"[ ]{2,}", " ", t)
# Aligne "Page of" etc. (inutile au parsing)
return t.strip()
def _safe_search(self, pat: str, text: str, flags=0) -> Optional[str]:
m = re.search(pat, text, flags)
return m.group(1).strip() if m else None
def _to_float(self, s: Optional[str]) -> Optional[float]:
if not s:
return None
s = s.replace(",", "").replace("kg", "").replace("%", "").strip()
# enlève éventuels espaces après le signe
s = re.sub(r"^([+\-])\s+", r"\1", s)
try:
return float(s)
except ValueError:
return None
def _split_lines(self, text: str) -> List[str]:
lines = [l.strip() for l in re.split(r"\r?\n", text) if l.strip()]
return lines
def _take_next_colon_values(self, lines: List[str], start_idx: int, count: int) -> List[str]:
"""
Récupère, à partir de start_idx (exclu), les 'count' prochaines valeurs qui suivent un ':'.
Tolère plusieurs valeurs sur la même ligne: ex ': A : B : C'
"""
vals = []
j = start_idx + 1
while j < len(lines) and len(vals) < count:
# attrape toutes les occurrences sur la ligne
parts = re.findall(r":\s*([^:]+?)(?=\s*(?::|$))", lines[j])
for v in parts:
if len(vals) < count:
vals.append(v.strip())
j += 1
return vals
def _extract_group_by_headers(self, text: str, headers: List[str], anchor_regex: Optional[str]=None) -> Dict[str, str]:
"""
Trouve une ligne contenant tous les headers (dans l'ordre) OU une ancre fournie,
puis mappe les N valeurs suivantes (débutant par ':') aux headers.
"""
lines = self._split_lines(self._norm(text))
# construire regex qui force l'ordre des headers
hdr_regex = r"\b" + r"\s+".join([re.escape(h) for h in headers]) + r"\b"
start_idx = None
for i, l in enumerate(lines):
if anchor_regex and re.search(anchor_regex, l, flags=re.I):
start_idx = i
break
if re.search(hdr_regex, l):
start_idx = i
break
if start_idx is None:
return {}
values = self._take_next_colon_values(lines, start_idx, len(headers))
return {h: (values[idx] if idx < len(values) else None) for idx, h in enumerate(headers)}
# ---------- API compatibles avec ton code ----------
def extract_table(self, text: str, headers: List[str]) -> Dict[str, str]:
# version robuste: détecte headers groupés et prend les valeurs en séquence
return self._extract_group_by_headers(text, headers)
def extract_weights(self, text: str, anchor: Optional[str]=None) -> Dict[str, str]:
"""
Extrait un bloc de poids Bales/Gross/Tare/Net.
- Si anchor est défini (ex. 'Bales Weighed'), on part de cette ancre.
- Sinon on cherche la ligne d'en-têtes 'Bales Gross Tare Net'.
"""
headers = ["Bales", "Gross", "Tare", "Net"]
block = self._extract_group_by_headers(text, headers,
anchor_regex=anchor if anchor else None)
# nettoyage des unités pour les poids
clean = {}
for k, v in block.items():
if v is None:
clean[k] = None
else:
clean[k] = v.replace("kg", "").strip()
return clean
# ---------- Parse principal ----------
def parse(self, text: str) -> dict:
# si tu as déjà empty_weight_report(), réutilise-le
r = {
"report": {},
"contract": {},
"parties": {},
"shipment": {},
"weights": {}
}
T = self._norm(text)
# report
# Exemple PDF: "AHK S/790329/161112/PK" (il y a un espace après AHK)
r["report"]["reference"] = self._safe_search(r"(AHK\s+[A-Z0-9/]+)", T)
r["report"]["date"] = self._safe_search(r"Produced On\s*([0-9A-Za-z ]+)", T)
# Order details: "Client Client Ref No. Buyer" puis valeurs
order = self.extract_table(T, ["Client", "Client Ref No.", "Buyer"])
r["contract"]["invoice_no"] = order.get("Client Ref No.") or \
self._safe_search(r"Client Reference:\s*([A-Z0-9\- /]+)", T)
r["parties"]["client"] = order.get("Client")
r["parties"]["buyer"] = order.get("Buyer")
# Infos expédition (2 blocs groupés)
ship = self.extract_table(T, ["Total Bales","Vessel","Voy. No.","B/L No.","B/L Date","Destination"])
ship2 = self.extract_table(T, ["Growth","Arrival Date","First date of weighing",
"Last Date of Weighing","Weighing method","Tare"])
r["shipment"]["bales"] = self._to_float(ship.get("Total Bales"))
r["shipment"]["vessel"] = ship.get("Vessel")
r["shipment"]["voyage_no"] = ship.get("Voy. No.")
r["shipment"]["bl_no"] = ship.get("B/L No.")
r["shipment"]["bl_date"] = ship.get("B/L Date")
r["shipment"]["port_destination"] = ship.get("Destination")
r["contract"]["origin"] = ship2.get("Growth")
r["shipment"]["arrival_date"] = ship2.get("Arrival Date")
r["shipment"]["first_weighing_date"] = ship2.get("First date of weighing")
r["shipment"]["last_weighing_date"] = ship2.get("Last Date of Weighing")
r["shipment"]["weighing_method"] = ship2.get("Weighing method")
# Chez AHK, "Tare: Invoice" indique la base de tare, pas un poids
r["shipment"]["tare_basis"] = ship2.get("Tare")
# Poids
# Bloc 1: invoice (juste après l'en-tête 'Bales Gross Tare Net')
inv = self.extract_weights(T) # sans ancre -> la 1ère occurrence
# Bloc 2: landed (ancré sur 'Bales Weighed')
land = self.extract_weights(T, anchor=r"\bBales Weighed\b")
r["weights"]["invoice_bales"] = self._to_float(inv.get("Bales"))
r["weights"]["invoice_gross_kg"] = self._to_float(inv.get("Gross"))
r["weights"]["invoice_tare_kg"] = self._to_float(inv.get("Tare"))
r["weights"]["invoice_net_kg"] = self._to_float(inv.get("Net"))
r["weights"]["landed_bales"] = self._to_float(land.get("Bales"))
r["weights"]["gross_landed_kg"] = self._to_float(land.get("Gross"))
r["weights"]["tare_kg"] = self._to_float(land.get("Tare"))
r["weights"]["net_landed_kg"] = self._to_float(land.get("Net"))
# Loss / Outturn
loss_sec = T # si tu as section(text, "LOSS", "Invoice average"), remplace par ta fonction
r["weights"]["gain_loss_kg"] = self._to_float(self._safe_search(r"LOSS.*?(-?\s*\d+\.?\d*)\s*kg", loss_sec, flags=re.S))
r["weights"]["gain_loss_percent"] = self._to_float(self._safe_search(r"Percentage\s*:\s*([\-+]?\s*\d+\.?\d*)", loss_sec))
return r
class IntertekParser:
lab="INTERTEK"
def parse(self,text):
r=empty_weight_report("INTERTEK")
pct=safe_search(r"([0-9.]+)\s*%",text)
r["report"]["reference"]=extract("Global Ref",text)
r["report"]["file_no"]=extract("Report / File No",text)
r["report"]["date"]=extract("Dated",text)
r["contract"]["contract_no"]=extract("Contract No",text)
r["contract"]["invoice_no"]=extract("Invoice No",text)
r["contract"]["origin"]=extract("Growth",text)
r["contract"]["commodity"]="Raw Cotton"
r["parties"]["buyer"]=extract("Buyer",text)
r["shipment"]["vessel"]=extract("Vessel",text)
r["shipment"]["bl_no"]=extract("B/L No",text)
r["shipment"]["arrival_date"]=extract("Arrival Date",text)
r["shipment"]["weighing_place"]=extract("Weighed at",text)
r["shipment"]["bales"]=to_float(extract("Invoice Quantity",text))
r["weights"]["gross_landed_kg"]=to_float(extract("Gross",text))
r["weights"]["tare_kg"]=to_float(extract("Invoice Tare",text))
r["weights"]["net_landed_kg"]=to_float(extract("Landed Weight",text))
r["weights"]["invoice_net_kg"]=to_float(extract("Invoice Weight",text))
r["weights"]["gain_loss_kg"]=to_float(extract("Gain",text))
r["weights"]["gain_loss_percent"]=to_float(pct)
return r
class RobertsonParser:
lab="ROBERTSON"
def parse(self,text):
r=empty_weight_report("ROBERTSON")
pct=safe_search(r"([0-9.]+)\s*%",text)
r["report"]["reference"]=extract("OUR REF",text)
r["report"]["date"]=extract("DATE",text)
r["contract"]["contract_no"]=extract("CONTRACT NO",text)
r["contract"]["invoice_no"]=extract("INVOICE NO",text)
r["contract"]["lc_no"]=extract("LIC NO",text)
r["contract"]["commodity"]="Raw Cotton"
r["parties"]["seller"]=extract("SELLER",text)
r["parties"]["buyer"]=extract("BUYER",text)
r["shipment"]["vessel"]=extract("NAME OF VESSEL",text)
r["shipment"]["port_loading"]=extract("SAILED FROM",text)
r["shipment"]["port_destination"]=extract("ARRIVED AT",text)
r["shipment"]["arrival_date"]=extract("DATE OF ARRIVAL",text)
r["shipment"]["weighing_place"]=extract("PLACE OF CONTROL",text)
r["shipment"]["bales"]=to_float(extract("CONSIGNMENT",text))
r["weights"]["gross_landed_kg"]=to_float(extract("GROSS",text))
r["weights"]["tare_kg"]=to_float(extract("TARE",text))
r["weights"]["net_landed_kg"]=to_float(extract("LANDED NET",text))
r["weights"]["invoice_net_kg"]=to_float(extract("INVOICE NET",text))
r["weights"]["gain_loss_kg"]=to_float(extract("GAIN",text))
r["weights"]["gain_loss_percent"]=to_float(pct)
return r
class SGSParser:
lab="SGS"
def parse(self,text):
r=empty_weight_report("SGS")
r["report"]["reference"]=extract("LANDING REPORT No",text)
r["report"]["file_no"]=extract("FILE NO.",text)
r["report"]["date"]=extract("DATE",text)
r["contract"]["contract_no"]=extract("CONTRACT NO.",text)
r["contract"]["invoice_no"]=extract("INVOICE NO.",text)
r["contract"]["origin"]=extract("ORIGIN",text)
r["contract"]["commodity"]=extract("PRODUCT",text)
r["parties"]["seller"]=extract("Seller",text)
r["parties"]["buyer"]=extract("Buyer",text)
r["parties"]["carrier"]=extract("Carrier",text)
r["shipment"]["bl_no"]=extract("B/L no.",text)
r["shipment"]["port_loading"]=extract("Port of loading",text)
r["shipment"]["port_destination"]=extract("Port of destination",text)
r["shipment"]["arrival_date"]=extract("Vessel arrival date",text)
r["shipment"]["weighing_place"]=extract("Place of weighing",text)
r["shipment"]["weighing_method"]=extract("Weighing mode",text)
r["shipment"]["bales"]=to_float(extract("Quantity arrived",text))
r["weights"]["gross_landed_kg"]=to_float(extract("Gross landed",text))
r["weights"]["tare_kg"]=to_float(extract("Tare",text))
r["weights"]["net_landed_kg"]=to_float(extract("Net landed",text))
r["weights"]["invoice_net_kg"]=to_float(extract("Net invoiced",text))
r["weights"]["gain_loss_kg"]=to_float(safe_search(r"Gain.*?([0-9.,]+)\s*kgs",text))
r["weights"]["gain_loss_percent"]=to_float(safe_search(r"Gain\s*\+?\s*([0-9.,]+)\s*%",text))
return r
class PICLParser:
lab="PICL"
def parse(self,text):
r=empty_weight_report("PICL")
r["report"]["reference"]=safe_search(r"No[:\s]+([A-Z0-9\-]+)",text)
r["report"]["date"]=safe_search(r"(Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday),?\s*([A-Za-z]+\s+[0-9]{1,2},\s*[0-9]{4})",text,group_index=2)
r["contract"]["contract_no"]=extract("Contract/Pl No & Date",text)
r["contract"]["invoice_no"]=extract("Invoice ilo & Date",text)
r["contract"]["lc_no"]=extract("L/C No & Date",text)
r["contract"]["origin"]=extract("Country of Origin",text)
r["contract"]["commodity"]=extract("Commodity",text)
r["parties"]["seller"]=extract("FAIRCOT SA",text)
r["parties"]["buyer"]=extract("M/S.",text)
r["parties"]["carrier"]=extract("Shipping Agent",text)
r["shipment"]["vessel"]=extract("Shipped Per Vessel",text)
r["shipment"]["bl_no"]=extract("B/L No & Date",text)
r["shipment"]["port_loading"]=extract("Port of Loading",text)
r["shipment"]["port_destination"]=extract("Port of Discharge",text)
r["shipment"]["arrival_date"]=extract("Date of Anival & LDL",text)
r["shipment"]["weighing_place"]=extract("Place & Date of Weighment",text)
r["shipment"]["weighing_method"]=extract("Method of Weighment",text)
r["shipment"]["bales"]=to_float(extract("Grand Total",text))
r["weights"]["gross_landed_kg"]=to_float(extract("Total;",text))
r["weights"]["tare_kg"]=to_float(extract("Tare Weight",text))
r["weights"]["net_landed_kg"]=to_float(extract("Grand Total",text))
r["weights"]["invoice_net_kg"]=to_float(extract("Invoice weight",text))
r["weights"]["gain_loss_kg"]=to_float(safe_search(r"(-[0-9.,]+)\s*KGS",text))
r["weights"]["gain_loss_percent"]=to_float(safe_search(r"\(\s*([0-9.,]+)\s*o/o\s*\)",text))
return r
# Configure root logger explicitly
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(file_handler)
root.addHandler(logging.StreamHandler())
# Use root logger for your app
logger = logging.getLogger(__name__)
app = FastAPI()
logger.info("Loading models...")
nlp = spacy.load("en_core_web_sm")
predictor = ocr_predictor(pretrained=True)
logger.info("Models loaded successfully.")
# =============================
# 🧠 Smart OCR
# =============================
@app.post("/ocr")
async def ocr(file: UploadFile):
logger.info(f"Received OCR request: {file.filename}")
try:
file_data = await file.read()
ext = file.filename.lower()
# --------- PDF with native text ---------
if ext.endswith(".pdf"):
logger.info("PDF detected → Extracting native text first")
reader = PdfReader(io.BytesIO(file_data))
direct_text = "".join(
page.extract_text() or "" for page in reader.pages
)
if direct_text.strip():
logger.info("Native PDF text found → No OCR needed")
return {"ocr_text": direct_text}
# -------- Fallback: scanned PDF OCR --------
logger.info("No native text → PDF treated as scanned → OCR")
from pdf2image import convert_from_bytes
images = convert_from_bytes(file_data)
text = ""
for i, img in enumerate(images):
logger.info(f"OCR page {i+1}/{len(images)}")
text += pytesseract.image_to_string(img) + "\n"
return {"ocr_text": text}
# --------- Image file OCR ---------
logger.info("Image detected → Running OCR")
img = Image.open(io.BytesIO(file_data))
text = pytesseract.image_to_string(img)
return {"ocr_text": text}
except Exception as e:
logger.error(f"OCR failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# =============================
# 🧱 Structure / Layout
# =============================
@app.post("/structure")
async def structure(file: UploadFile):
logger.info(f"Received structure request: {file.filename}")
try:
file_data = await file.read()
ext = file.filename.lower()
if ext.endswith(".pdf"):
doc = DocumentFile.from_pdf(file_data)
logger.info(f"Structure prediction on PDF ({len(doc)} pages)")
else:
img = Image.open(io.BytesIO(file_data)).convert("RGB")
doc = DocumentFile.from_images([img])
logger.info("Structure prediction on image")
res = predictor(doc)
return {"structure": str(res)}
except Exception as e:
logger.error(f"Structure extraction failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# =============================
# 📊 Tables extraction (PDF only)
# =============================
@app.post("/tables")
async def tables(file: UploadFile):
logger.info(f"Received table extraction request: {file.filename}")
try:
file_data = await file.read()
buffer = io.BytesIO(file_data)
tables = camelot.read_pdf(buffer)
logger.info(f"Found {len(tables)} tables")
return {"tables": [t.df.to_dict() for t in tables]}
except Exception as e:
logger.error(f"Table extraction failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
def safe_search(pattern, text, default=None, group_index=1, context=""):
"""Recherche sécurisée avec logging en cas d'absence de correspondance."""
m = re.search(pattern, text, re.I | re.S)
if not m:
logger.warning("Pattern not found for %s: %s", context, pattern)
return default
try:
return m.group(group_index).strip()
except IndexError:
logger.warning("Group index %d not found for %s: %s", group_index, context, pattern)
return default
def to_float(s):
if not s:
return None
s = s.replace(",", "").replace("Kgs", "").replace("kg", "").replace("%", "")
s = s.replace("lbs", "").replace("LBS", "")
s = s.strip()
try:
return float(s)
except:
return None
def section(text, start, end=None):
"""Extract a block of text between two headings, safely."""
pattern_start = re.escape(start)
if end:
pattern_end = re.escape(end)
reg = re.compile(pattern_start + r"(.*?)" + pattern_end, re.S | re.I)
else:
reg = re.compile(pattern_start + r"(.*)", re.S | re.I)
m = reg.search(text)
if not m:
logger.warning("Section not found: start='%s', end='%s'", start, end)
return ""
return m.group(1).strip()
def extract_field(text, label, default=None):
"""Extract a line of the form 'Label: value', safely."""
pattern = rf"{re.escape(label)}\s*:?[\s]+([^\n]+)"
return safe_search(pattern, text, default=default, context=f"field '{label}'")
def extract(label, text, default=None):
"""
Robust extraction for OCR/PDF text.
Works with:
Label: Value
Label Value
Label .... Value
"""
if not text:
return default
patterns = [
rf"{re.escape(label)}\s*[:\-]?\s*([^\n\r]+)",
rf"{re.escape(label)}\s+([^\n\r]+)"
]
for p in patterns:
m = re.search(p, text, re.I)
if m:
return m.group(1).strip()
return default
def extract_report_metadata(text):
logger.info("Starting metadata extraction, text length=%d", len(text))
try:
# ----------- SECTIONS -----------
order_details = section(text, "Order details", "Weights")
invoice_section = section(text, "INVOICE WEIGHTS", "Bales Weighed")
landed_section = section(text, "Bales Weighed", "Outturn")
loss_section = section(text, "LOSS", "Invoice average")
avg_section = section(text, "Invoice average", "Comments")
signature_block = section(text, "Signed on")
# ----------- TOP INFO -----------
top_info = {
"produced_on": extract_field(text, "Produced On"),
"printed_date": extract_field(text, "Printed Date"),
"client_reference": extract_field(text, "Client Reference"),
"report_number": safe_search(r"(AHK\S+)", text, default="", context="report_number", group_index=1),
}
# ----------- ORDER DETAILS -----------
parties = {
"client": extract_field(order_details, "Client"),
"client_ref_no": extract_field(order_details, "Client Ref No"),
"buyer": extract_field(order_details, "Buyer"),
"destination": extract_field(order_details, "Destination"),
}
shipment = {
"total_bales": extract_field(order_details, "Total Bales"),
"vessel": extract_field(order_details, "Vessel"),
"voyage_no": extract_field(order_details, "Voy. No"),
"bl_no": extract_field(order_details, "B/L No"),
"bl_date": extract_field(order_details, "B/L Date"),
"growth": extract_field(order_details, "Growth"),
"arrival_date": extract_field(order_details, "Arrival Date"),
"first_weighing_date": extract_field(order_details, "First date of weighing"),
"last_weighing_date": extract_field(order_details, "Last Date of Weighing"),
"weighing_method": extract_field(order_details, "Weighing method"),
"tare_basis": extract_field(order_details, "Tare"),
}
# ----------- INVOICE SECTION -----------
invoice = {
"bales": extract_field(invoice_section, "Bales"),
"gross": extract_field(invoice_section, "Gross"),
"tare": extract_field(invoice_section, "Tare"),
"net": extract_field(invoice_section, "Net"),
}
# ----------- LANDED SECTION -----------
landed = {
"bales": extract_field(landed_section, "Bales"),
"gross": extract_field(landed_section, "Gross"),
"tare": extract_field(landed_section, "Tare"),
"net": extract_field(landed_section, "Net"),
}
# ----------- LOSS SECTION -----------
loss = {
"kg": extract_field(loss_section, "kg"),
"lb": extract_field(loss_section, "lb"),
"percent": extract_field(loss_section, "Percentage"),
}
# ----------- AVERAGES SECTION -----------
averages = {
"invoice_gross_per_bale": extract_field(avg_section, "Invoice average"),
"landed_gross_per_bale": extract_field(avg_section, "Landed average"),
}
# ----------- SIGNATURE -----------
signature = {
"signed_on": extract_field(signature_block, "Signed on"),
"signed_by": safe_search(r"\n([A-Za-z ]+)\nClient Services", signature_block, default="", context="signed_by"),
"role": "Client Services Coordinator",
"company": "Alfred H. Knight International Limited"
}
logger.info("Metadata extraction completed successfully")
return {
"report": top_info,
"parties": parties,
"shipment": shipment,
"weights": {
"invoice": invoice,
"landed": landed,
"loss": loss,
"averages": averages
},
"signature": signature
}
except Exception as e:
logger.exception("Unexpected error during metadata extraction")
raise HTTPException(status_code=500, detail=f"Metadata extraction failed: {e}")
def detect_template(text):
t = text.lower()
if "alfred h. knight" in t and "cotton landing report" in t:
return "AHK"
if "intertek" in t and "landing report" in t:
return "INTERTEK"
if "robertson international" in t or "ri ref no" in t:
return "ROBERTSON"
if "landing report" in t and "carcon cargo" in t:
return "SGS"
if "pacific inspection company" in t or "picl-bd.com" in t:
return "PICL"
return "UNKNOWN"
@app.post("/metadata")
async def metadata(text: str = Body(..., embed=True)):
return extract_report_metadata(text)
@app.post("/parse")
async def parse_endpoint(text: str = Body(..., embed=True)):
return parse_report(text)
PARSERS = {
"AHK": AHKParser(),
"INTERTEK": IntertekParser(),
"ROBERTSON": RobertsonParser(),
"SGS": SGSParser(),
"PICL": PICLParser()
}
def empty_weight_report(lab):
return {
"lab": lab,
"report": {"reference": None, "file_no": None, "date": None},
"contract": {"contract_no": None, "invoice_no": None, "lc_no": None, "origin": None, "commodity": None},
"parties": {"seller": None, "buyer": None, "carrier": None},
"shipment": {
"vessel": None, "bl_no": None, "port_loading": None,
"port_destination": None, "arrival_date": None,
"weighing_place": None, "weighing_method": None,
"bales": None
},
"weights": {
"gross_landed_kg": None, "tare_kg": None,
"net_landed_kg": None, "invoice_net_kg": None,
"gain_loss_kg": None, "gain_loss_percent": None
}
}
def parse_report(text):
template=detect_template(text)
if template not in PARSERS:
return {"template":"UNKNOWN"}
return PARSERS[template].parse(text)