Files
automation-service/app.py
2026-01-11 21:29:26 +01:00

867 lines
32 KiB
Python

from fastapi import FastAPI, UploadFile, HTTPException, Body
from PIL import Image
import pytesseract
from doctr.models import ocr_predictor
from doctr.io import DocumentFile
from PyPDF2 import PdfReader
import pdfplumber
import camelot
import spacy
import logging
import io
from logging.handlers import RotatingFileHandler
import re
from datetime import datetime
LOG_PATH = "/var/log/automation-service.log"
file_handler = RotatingFileHandler(
LOG_PATH,
maxBytes=10*1024*1024,
backupCount=5,
encoding="utf-8"
)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(message)s"
))
class AHKParser:
lab = "AHK"
def _clean_value(self, value):
"""Nettoie la valeur en supprimant les espaces inutiles"""
if value:
return value.strip()
return value
def parse(self, text):
"""Parse le texte et retourne un dictionnaire structuré"""
result = {
"lab": self.lab,
"report": self._extract_report_info(text),
"contract": self._extract_contract_info(text),
"parties": self._extract_parties_info(text),
"shipment": self._extract_shipment_info(text),
"weights": self._extract_weights_info(text)
}
self.data = result
return result
def _extract_report_info(self, text):
"""Extrait les informations du rapport"""
report_info = {
"reference": None,
"file_no": None,
"date": None
}
# Recherche de la référence client - plus précise
ref_match = re.search(r'Client\s+Reference:\s*(S-\d+\s*/\s*INV\s*\d+)', text)
if ref_match:
report_info["reference"] = self._clean_value(ref_match.group(1))
# Recherche du numéro de fichier AHK
file_no_match = re.search(r'AHK\s+S/([\w/]+)', text)
if file_no_match:
report_info["file_no"] = self._clean_value(file_no_match.group(1))
# Recherche de la date du rapport
date_match = re.search(r'Signed\s+on\s*(\d{1,2}-[A-Za-z]{3}-\d{4})', text)
if date_match:
report_info["date"] = self._clean_value(date_match.group(1))
return report_info
def _extract_contract_info(self, text):
"""Extrait les informations du contrat"""
contract_info = {
"contract_no": None,
"invoice_no": None,
"lc_no": None,
"origin": None,
"commodity": None
}
# Extraction de la référence client
ref_match = re.search(r'Client\s+Ref\s+No\.\s*:\s*([^\n]+)', text)
if ref_match:
ref_text = ref_match.group(1).strip()
# Sépare S-3488 et INV 4013
parts = re.split(r'[/\s]+', ref_text)
for part in parts:
if part.startswith('S-'):
contract_info["contract_no"] = part.strip()
elif part.startswith('INV'):
contract_info["invoice_no"] = part.strip()
# Extraction de l'origine et de la marchandise - regex plus précise
growth_match = re.search(r'Growth\s*:\s*([A-Z\s]+?)(?=\s*(?:Vessel|$))', text)
if growth_match:
origin_text = growth_match.group(1).strip()
if "AUSTRALIAN" in origin_text.upper():
contract_info["origin"] = "AUSTRALIA"
contract_info["commodity"] = "RAW COTTON"
return contract_info
def _extract_parties_info(self, text):
"""Extrait les informations sur les parties"""
parties_info = {
"seller": None,
"buyer": None,
"carrier": None
}
# Extraction du vendeur (Client) - regex plus précise
seller_match = re.search(r'Client\s*:\s*([^\n:]+?)(?=\s*(?:Client Ref|Buyer|$))', text)
if seller_match:
parties_info["seller"] = self._clean_value(seller_match.group(1))
# Extraction de l'acheteur (Buyer) - regex plus précise
buyer_match = re.search(r'Buyer\s*:\s*([^\n:]+?)(?=\s*(?:Total Bales|$))', text)
if buyer_match:
parties_info["buyer"] = self._clean_value(buyer_match.group(1))
# Extraction du transporteur (nom du navire seulement)
vessel_match = re.search(r'Vessel\s*:\s*([A-Z\s]+?)(?=\s*(?:Arrival|Voy|$))', text)
if vessel_match:
parties_info["carrier"] = self._clean_value(vessel_match.group(1))
return parties_info
def _extract_shipment_info(self, text):
"""Extrait les informations d'expédition"""
shipment_info = {
"vessel": None,
"bl_no": None,
"bl_date": None,
"port_loading": None,
"port_destination": None,
"arrival_date": None,
"weighing_place": None,
"weighing_method": None,
"bales": None
}
# Extraction du navire (nom seulement)
vessel_match = re.search(r'Vessel\s*:\s*([A-Z\s]+?)(?=\s*(?:Arrival|Voy|$))', text)
if vessel_match:
shipment_info["vessel"] = self._clean_value(vessel_match.group(1))
# Extraction du numéro de connaissement (seulement le numéro)
bl_no_match = re.search(r'B/L\s+No\.\s*:\s*([A-Z0-9]+)(?=\s|$)', text)
if bl_no_match:
shipment_info["bl_no"] = self._clean_value(bl_no_match.group(1))
# Extraction de la date du connaissement
bl_date_match = re.search(r'B/L\s+Date\s*:\s*(\d{1,2}-[A-Za-z]{3}-\d{4})(?=\s|$)', text)
if bl_date_match:
shipment_info["bl_date"] = self._clean_value(bl_date_match.group(1))
# Extraction du port de destination (sans le "Tare")
dest_match = re.search(r'Destination\s*:\s*([A-Z,\s]+?)(?=\s*(?:Tare|$))', text)
if dest_match:
shipment_info["port_destination"] = self._clean_value(dest_match.group(1))
# Extraction de la date d'arrivée
arrival_match = re.search(r'Arrival\s+Date\s*:\s*(\d{1,2}-[A-Za-z]{3}-\d{4})(?=\s|$)', text)
if arrival_match:
shipment_info["arrival_date"] = self._clean_value(arrival_match.group(1))
# Extraction de la méthode de pesée
weighing_method_match = re.search(r'Weighing\s+method\s*:\s*([^\n]+?)(?=\s*(?:Tare|$))', text)
if weighing_method_match:
shipment_info["weighing_method"] = self._clean_value(weighing_method_match.group(1))
# Extraction du nombre de balles
bales_match = re.search(r'Total\s+Bales\s*:\s*(\d+)(?=\s|$)', text)
if bales_match:
try:
shipment_info["bales"] = int(bales_match.group(1).strip())
except ValueError:
shipment_info["bales"] = None
return shipment_info
def _extract_weights_info(self, text):
"""Extrait les informations de poids"""
weights_info = {
"gross_landed_kg": None,
"tare_kg": None,
"net_landed_kg": None,
"invoice_net_kg": None,
"gain_loss_kg": None,
"gain_loss_percent": None
}
# Extraction du poids brut débarqué (corrigé - doit être 100580 kg)
gross_landed_match = re.search(r'LANDED WEIGHTS[\s\S]*?Gross\s*:\s*([\d.,]+)\s*kg', text)
if gross_landed_match:
try:
weights_info["gross_landed_kg"] = float(gross_landed_match.group(1).replace(',', '').strip())
except ValueError:
pass
# Extraction du poids de tare
tare_match = re.search(r'Tare\s*:\s*([\d.,]+)\s*kg', text)
if tare_match:
try:
weights_info["tare_kg"] = float(tare_match.group(1).replace(',', '').strip())
except ValueError:
pass
# Extraction du poids net débarqué (corrigé - doit être 100078.40 kg)
net_landed_match = re.search(r'LANDED WEIGHTS[\s\S]*?Net\s*:\s*([\d.,]+)\s*kg', text)
if net_landed_match:
try:
weights_info["net_landed_kg"] = float(net_landed_match.group(1).replace(',', '').strip())
except ValueError:
pass
# Extraction du poids net facturé (101299 kg)
invoice_net_match = re.search(r'INVOICE WEIGHTS[\s\S]*?Net\s*:\s*([\d.,]+)\s*kg', text)
if invoice_net_match:
try:
weights_info["invoice_net_kg"] = float(invoice_net_match.group(1).replace(',', '').strip())
except ValueError:
pass
# Extraction de la perte en kg
loss_match = re.search(r'LOSS\s*:\s*-\s*([\d.,]+)\s*kg', text)
if loss_match:
try:
weights_info["gain_loss_kg"] = -float(loss_match.group(1).replace(',', '').strip())
except ValueError:
pass
# Extraction du pourcentage de perte
percent_match = re.search(r'Percentage\s*:\s*-\s*([\d.,]+)%', text)
if percent_match:
try:
weights_info["gain_loss_percent"] = -float(percent_match.group(1).replace(',', '').strip())
except ValueError:
pass
return weights_info
import re
class IntertekParser:
lab = "Intertek"
def _clean_value(self, value):
"""Nettoie la valeur en supprimant les espaces inutiles"""
if value:
return value.strip()
return value
def _extract_number(self, text, pattern, is_int=False):
"""Extrait un nombre (int ou float) du texte selon un pattern regex"""
match = re.search(pattern, text)
if match:
try:
# Nettoie la chaîne numérique
num_str = match.group(1).replace(',', '').replace(' ', '').strip()
if is_int:
return int(num_str)
else:
return float(num_str)
except (ValueError, AttributeError):
return None
return None
def parse(self, text):
"""Parse le texte et retourne un dictionnaire structuré"""
result = {
"lab": self.lab,
"report": self._extract_report_info(text),
"contract": self._extract_contract_info(text),
"parties": self._extract_parties_info(text),
"shipment": self._extract_shipment_info(text),
"weights": self._extract_weights_info(text)
}
return result
def _extract_report_info(self, text):
"""Extrait les informations du rapport"""
report_info = {
"reference": None,
"file_no": None,
"date": None
}
# Recherche de la référence globale
ref_match = re.search(r'Global Ref\s*:\s*(GLO-\d+-[A-Z]+)', text)
if ref_match:
report_info["reference"] = self._clean_value(ref_match.group(1))
# Recherche du numéro de fichier
file_no_match = re.search(r'Report\s*/\s*File No\s*:\s*([A-Z]+-AGR\d+-?)', text)
if file_no_match:
report_info["file_no"] = self._clean_value(file_no_match.group(1))
# Recherche de la date du rapport
date_match = re.search(r'Dated\s*:\s*(\d{1,2}\.\d{1,2}\.\d{4})', text)
if date_match:
report_info["date"] = self._clean_value(date_match.group(1))
return report_info
def _extract_contract_info(self, text):
"""Extrait les informations du contrat"""
contract_info = {
"contract_no": None,
"invoice_no": None,
"lc_no": None, # Non présent dans ce rapport
"origin": None,
"commodity": None
}
# Extraction du numéro de contrat
contract_match = re.search(r'Contract No\s*:\s*([A-Z]?-\d+)', text)
if contract_match:
contract_info["contract_no"] = self._clean_value(contract_match.group(1))
# Extraction du numéro de facture
invoice_match = re.search(r'Invoice No\s*:\s*(\d+)', text)
if invoice_match:
contract_info["invoice_no"] = self._clean_value(invoice_match.group(1))
# Extraction de l'origine et de la marchandise
growth_match = re.search(r'Growth\s*:\s*([A-Z\s]+)(?=\s*Shipper|\n|$)', text)
if growth_match:
origin_text = growth_match.group(1).strip()
if "GREECE" in origin_text.upper():
contract_info["origin"] = "GREECE"
contract_info["commodity"] = "RAW COTTON"
return contract_info
def _extract_parties_info(self, text):
"""Extrait les informations sur les parties"""
parties_info = {
"seller": None,
"buyer": None,
"carrier": None
}
# Extraction du vendeur (Shipper)
seller_match = re.search(r'Shipper\s*:\s*([^\n]+?)(?=\s*(?:Buyer|$))', text)
if seller_match:
parties_info["seller"] = self._clean_value(seller_match.group(1))
# Extraction de l'acheteur (Buyer)
buyer_match = re.search(r'Buyer\s*:\s*([^\n]+?)(?=\s*(?:CONTAINER|TOTAL|$))', text)
if buyer_match:
parties_info["buyer"] = self._clean_value(buyer_match.group(1))
# Extraction du transporteur (nom du navire seulement)
vessel_match = re.search(r'Vessel\s*:\s*([A-Z\s]+?)(?=\s*(?:Arrival|$))', text)
if vessel_match:
parties_info["carrier"] = self._clean_value(vessel_match.group(1))
return parties_info
def _extract_shipment_info(self, text):
"""Extrait les informations d'expédition"""
shipment_info = {
"vessel": None,
"bl_no": None,
"bl_date": None, # Non présent dans ce rapport
"port_loading": None, # Non présent dans ce rapport
"port_destination": None, # Non présent dans ce rapport
"arrival_date": None,
"weighing_place": None,
"weighing_method": None,
"bales": None
}
# Extraction du navire
vessel_match = re.search(r'Vessel\s*:\s*([A-Z\s]+?)(?=\s*(?:Arrival|$))', text)
if vessel_match:
shipment_info["vessel"] = self._clean_value(vessel_match.group(1))
# Extraction du numéro de connaissement
bl_no_match = re.search(r'B/L\s+No\.\s*:\s*([A-Z0-9]+)', text)
if bl_no_match:
shipment_info["bl_no"] = self._clean_value(bl_no_match.group(1))
# Extraction de la date d'arrivée
arrival_match = re.search(r'Arrival Date\s*:\s*(\d{1,2}\.\d{1,2}\.\d{4})', text)
if arrival_match:
shipment_info["arrival_date"] = self._clean_value(arrival_match.group(1))
# Extraction du lieu de pesée
weighing_place_match = re.search(r'Weighed at\s*:\s*([^\n]+?)(?=\s*(?:Vessel|$))', text)
if weighing_place_match:
shipment_info["weighing_place"] = self._clean_value(weighing_place_match.group(1))
# Extraction de la méthode de pesée
# Recherche dans les remarques
remarks_section = re.search(r'REMARKS\s*(.+?)(?=ISSUED BY|$)', text, re.DOTALL | re.IGNORECASE)
if remarks_section:
remarks_text = remarks_section.group(1)
if "weighbridge" in remarks_text.lower():
shipment_info["weighing_method"] = "Weighbridge weighing by empty/full truck"
# Extraction du nombre de balles (à partir du total)
bales_match = re.search(r'TOTAL\s+(\d{1,4}(?:,\d{3})?)\s+[\d,]+\.\d{2}', text)
if not bales_match:
# Essayons une autre approche
bales_match = re.search(r'Invoice Quantity\s*:\s*(\d+)\s+Bales', text)
if bales_match:
try:
bales_str = bales_match.group(1).replace(',', '').strip()
shipment_info["bales"] = int(bales_str)
except ValueError:
shipment_info["bales"] = None
return shipment_info
def _extract_weights_info(self, text):
"""Extrait les informations de poids"""
weights_info = {
"gross_landed_kg": None,
"tare_kg": None,
"net_landed_kg": None,
"invoice_net_kg": None,
"gain_loss_kg": None,
"gain_loss_percent": None
}
# Extraction du poids brut débarqué
gross_match = re.search(r'Gross Landed Weight\s*:\s*([\d,]+\.\d{2})\s*kgs', text)
if gross_match:
weights_info["gross_landed_kg"] = float(gross_match.group(1).replace(',', ''))
# Extraction du poids de tare
tare_match = re.search(r'Invoice Tare\s*:\s*([\d,]+\.\d{2})\s*Kgs', text)
if tare_match:
weights_info["tare_kg"] = float(tare_match.group(1).replace(',', ''))
# Extraction du poids net débarqué
net_landed_match = re.search(r'Net Landed Weight\s*:\s*([\d,]+\.\d{2})\s*Kgs', text)
if net_landed_match:
weights_info["net_landed_kg"] = float(net_landed_match.group(1).replace(',', ''))
# Extraction du poids net facturé
invoice_net_match = re.search(r'Net Invoice Weight\s*:\s*([\d,]+\.\d{2})\s*Kgs', text)
if invoice_net_match:
weights_info["invoice_net_kg"] = float(invoice_net_match.group(1).replace(',', ''))
# Extraction du gain en kg
gain_match = re.search(r'Gain\s+([\d,]+\.\d{2})\s*Kgs', text)
if gain_match:
weights_info["gain_loss_kg"] = float(gain_match.group(1).replace(',', ''))
# Extraction du pourcentage de gain (0.4% dans le tableau)
percent_match = re.search(r'TOTAL\s+\d+\s+[\d,]+\.\d{2}\s+([\d.]+)%', text)
if percent_match:
try:
weights_info["gain_loss_percent"] = float(percent_match.group(1))
except ValueError:
pass
return weights_info
# Configure root logger explicitly
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(file_handler)
root.addHandler(logging.StreamHandler())
# Use root logger for your app
logger = logging.getLogger(__name__)
app = FastAPI()
logger.info("Loading models...")
nlp = spacy.load("en_core_web_sm")
predictor = ocr_predictor(pretrained=True)
logger.info("Models loaded successfully.")
# =============================
# 🧠 Smart OCR
# =============================
# @app.post("/ocr")
# async def ocr(file: UploadFile):
# logger.info(f"Received OCR request: {file.filename}")
# try:
# file_data = await file.read()
# ext = file.filename.lower()
# # --------- PDF with native text ---------
# if ext.endswith(".pdf"):
# logger.info("PDF detected → Extracting native text first")
# reader = PdfReader(io.BytesIO(file_data))
# direct_text = "".join(
# page.extract_text() or "" for page in reader.pages
# )
# if direct_text.strip():
# logger.info("Native PDF text found → No OCR needed")
# return {"ocr_text": direct_text}
# # -------- Fallback: scanned PDF OCR --------
# logger.info("No native text → PDF treated as scanned → OCR")
# from pdf2image import convert_from_bytes
# images = convert_from_bytes(file_data)
# text = ""
# for i, img in enumerate(images):
# logger.info(f"OCR page {i+1}/{len(images)}")
# text += pytesseract.image_to_string(img) + "\n"
# return {"ocr_text": text}
# # --------- Image file OCR ---------
# logger.info("Image detected → Running OCR")
# img = Image.open(io.BytesIO(file_data))
# text = pytesseract.image_to_string(img)
# return {"ocr_text": text}
# except Exception as e:
# logger.error(f"OCR failed: {e}", exc_info=True)
# raise HTTPException(status_code=500, detail=str(e))
@app.post("/ocr")
async def ocr(file: UploadFile):
"""
Smart PDF processing optimized for cotton landing reports
"""
logger.info(f"Smart OCR request: {file.filename}")
try:
file_data = await file.read()
# Strategy 1: Try pdfplumber (best for digital PDFs)
try:
with pdfplumber.open(io.BytesIO(file_data)) as pdf:
text_parts = []
tables_found = []
for page in pdf.pages:
# Extract text
page_text = page.extract_text(x_tolerance=2, y_tolerance=2)
if page_text:
text_parts.append(page_text)
# Look for tables (common in landing reports)
tables = page.extract_tables({
"vertical_strategy": "text",
"horizontal_strategy": "text",
"snap_tolerance": 5,
})
for table in tables:
if table and len(table) > 1:
tables_found.append(table)
combined_text = "\n".join(text_parts)
return {"ocr_text": combined_text}
# if combined_text.strip():
# logger.info(f"pdfplumber extracted {len(combined_text)} chars")
# # Try parsing structured data
# structured_data = parse_cotton_report(combined_text)
# # Check if we got key fields
# if (structured_data.get("shipment", {}).get("bales") and
# structured_data.get("weights", {}).get("net_landed_kg")):
# logger.info("Successfully parsed structured data from pdfplumber")
# return {
# "method": "pdfplumber",
# "structured_data": structured_data,
# "raw_text_sample": combined_text[:500]
# }
except Exception as e:
logger.warning(f"pdfplumber attempt: {e}")
# from pdf2image import convert_from_bytes
# images = convert_from_bytes(file_data, dpi=200)
# ocr_results = []
# for img in images:
# text = pytesseract.image_to_string(
# img,
# config='--psm 6 -c preserve_interword_spaces=1'
# )
# ocr_results.append(text)
# ocr_text = "\n".join(ocr_results)
# return {
# "method": "tesseract_ocr",
# "structured_data": ocr_text,
# "raw_text_sample": ocr_text[:500]
# }
except Exception as e:
logger.error(f"Smart OCR failed: {e}", exc_info=True)
return {
"error": str(e),
"success": False
}
# =============================
# 🧱 Structure / Layout
# =============================
@app.post("/structure")
async def structure(file: UploadFile):
logger.info(f"Received structure request: {file.filename}")
try:
file_data = await file.read()
ext = file.filename.lower()
if ext.endswith(".pdf"):
doc = DocumentFile.from_pdf(file_data)
logger.info(f"Structure prediction on PDF ({len(doc)} pages)")
else:
img = Image.open(io.BytesIO(file_data)).convert("RGB")
doc = DocumentFile.from_images([img])
logger.info("Structure prediction on image")
res = predictor(doc)
return {"structure": str(res)}
except Exception as e:
logger.error(f"Structure extraction failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# =============================
# 📊 Tables extraction (PDF only)
# =============================
@app.post("/tables")
async def tables(file: UploadFile):
logger.info(f"Received table extraction request: {file.filename}")
try:
file_data = await file.read()
buffer = io.BytesIO(file_data)
tables = camelot.read_pdf(buffer)
logger.info(f"Found {len(tables)} tables")
return {"tables": [t.df.to_dict() for t in tables]}
except Exception as e:
logger.error(f"Table extraction failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
def safe_search(pattern, text, default=None, group_index=1, context=""):
"""Recherche sécurisée avec logging en cas d'absence de correspondance."""
m = re.search(pattern, text, re.I | re.S)
if not m:
logger.warning("Pattern not found for %s: %s", context, pattern)
return default
try:
return m.group(group_index).strip()
except IndexError:
logger.warning("Group index %d not found for %s: %s", group_index, context, pattern)
return default
def to_float(s):
if not s:
return None
s = s.replace(",", "").replace("Kgs", "").replace("kg", "").replace("%", "")
s = s.replace("lbs", "").replace("LBS", "")
s = s.strip()
try:
return float(s)
except:
return None
def section(text, start, end=None):
"""Extract a block of text between two headings, safely."""
pattern_start = re.escape(start)
if end:
pattern_end = re.escape(end)
reg = re.compile(pattern_start + r"(.*?)" + pattern_end, re.S | re.I)
else:
reg = re.compile(pattern_start + r"(.*)", re.S | re.I)
m = reg.search(text)
if not m:
logger.warning("Section not found: start='%s', end='%s'", start, end)
return ""
return m.group(1).strip()
def extract_field(text, label, default=None):
"""Extract a line of the form 'Label: value', safely."""
pattern = rf"{re.escape(label)}\s*:?[\s]+([^\n]+)"
return safe_search(pattern, text, default=default, context=f"field '{label}'")
def extract(label, text, default=None):
"""
Robust extraction for OCR/PDF text.
Works with:
Label: Value
Label Value
Label .... Value
"""
if not text:
return default
patterns = [
rf"{re.escape(label)}\s*[:\-]?\s*([^\n\r]+)",
rf"{re.escape(label)}\s+([^\n\r]+)"
]
for p in patterns:
m = re.search(p, text, re.I)
if m:
return m.group(1).strip()
return default
def extract_report_metadata(text):
logger.info("Starting metadata extraction, text length=%d", len(text))
try:
# ----------- SECTIONS -----------
order_details = section(text, "Order details", "Weights")
invoice_section = section(text, "INVOICE WEIGHTS", "Bales Weighed")
landed_section = section(text, "Bales Weighed", "Outturn")
loss_section = section(text, "LOSS", "Invoice average")
avg_section = section(text, "Invoice average", "Comments")
signature_block = section(text, "Signed on")
# ----------- TOP INFO -----------
top_info = {
"produced_on": extract_field(text, "Produced On"),
"printed_date": extract_field(text, "Printed Date"),
"client_reference": extract_field(text, "Client Reference"),
"report_number": safe_search(r"(AHK\S+)", text, default="", context="report_number", group_index=1),
}
# ----------- ORDER DETAILS -----------
parties = {
"client": extract_field(order_details, "Client"),
"client_ref_no": extract_field(order_details, "Client Ref No"),
"buyer": extract_field(order_details, "Buyer"),
"destination": extract_field(order_details, "Destination"),
}
shipment = {
"total_bales": extract_field(order_details, "Total Bales"),
"vessel": extract_field(order_details, "Vessel"),
"voyage_no": extract_field(order_details, "Voy. No"),
"bl_no": extract_field(order_details, "B/L No"),
"bl_date": extract_field(order_details, "B/L Date"),
"growth": extract_field(order_details, "Growth"),
"arrival_date": extract_field(order_details, "Arrival Date"),
"first_weighing_date": extract_field(order_details, "First date of weighing"),
"last_weighing_date": extract_field(order_details, "Last Date of Weighing"),
"weighing_method": extract_field(order_details, "Weighing method"),
"tare_basis": extract_field(order_details, "Tare"),
}
# ----------- INVOICE SECTION -----------
invoice = {
"bales": extract_field(invoice_section, "Bales"),
"gross": extract_field(invoice_section, "Gross"),
"tare": extract_field(invoice_section, "Tare"),
"net": extract_field(invoice_section, "Net"),
}
# ----------- LANDED SECTION -----------
landed = {
"bales": extract_field(landed_section, "Bales"),
"gross": extract_field(landed_section, "Gross"),
"tare": extract_field(landed_section, "Tare"),
"net": extract_field(landed_section, "Net"),
}
# ----------- LOSS SECTION -----------
loss = {
"kg": extract_field(loss_section, "kg"),
"lb": extract_field(loss_section, "lb"),
"percent": extract_field(loss_section, "Percentage"),
}
# ----------- AVERAGES SECTION -----------
averages = {
"invoice_gross_per_bale": extract_field(avg_section, "Invoice average"),
"landed_gross_per_bale": extract_field(avg_section, "Landed average"),
}
# ----------- SIGNATURE -----------
signature = {
"signed_on": extract_field(signature_block, "Signed on"),
"signed_by": safe_search(r"\n([A-Za-z ]+)\nClient Services", signature_block, default="", context="signed_by"),
"role": "Client Services Coordinator",
"company": "Alfred H. Knight International Limited"
}
logger.info("Metadata extraction completed successfully")
return {
"report": top_info,
"parties": parties,
"shipment": shipment,
"weights": {
"invoice": invoice,
"landed": landed,
"loss": loss,
"averages": averages
},
"signature": signature
}
except Exception as e:
logger.exception("Unexpected error during metadata extraction")
raise HTTPException(status_code=500, detail=f"Metadata extraction failed: {e}")
def detect_template(text):
t = text.lower()
if "alfred h. knight" in t and "cotton landing report" in t:
return "AHK"
if "intertek" in t and "landing report" in t:
return "INTERTEK"
if "robertson international" in t or "ri ref no" in t:
return "ROBERTSON"
if "landing report" in t and "carcon cargo" in t:
return "SGS"
if "pacific inspection company" in t or "picl-bd.com" in t:
return "PICL"
return "UNKNOWN"
@app.post("/metadata")
async def metadata(text: str = Body(..., embed=True)):
return extract_report_metadata(text)
@app.post("/parse")
async def parse_endpoint(text: str = Body(..., embed=True)):
return parse_report(text)
PARSERS = {
"AHK": AHKParser(),
"INTERTEK": IntertekParser()
}
def empty_weight_report(lab):
return {
"lab": lab,
"report": {"reference": None, "file_no": None, "date": None},
"contract": {"contract_no": None, "invoice_no": None, "lc_no": None, "origin": None, "commodity": None},
"parties": {"seller": None, "buyer": None, "carrier": None},
"shipment": {
"vessel": None, "bl_no": None, "bl_date": None, "port_loading": None,
"port_destination": None, "arrival_date": None,
"weighing_place": None, "weighing_method": None,
"bales": None
},
"weights": {
"gross_landed_kg": None, "tare_kg": None,
"net_landed_kg": None, "invoice_net_kg": None,
"gain_loss_kg": None, "gain_loss_percent": None
}
}
def parse_report(text):
template=detect_template(text)
if template not in PARSERS:
return {"template":"UNKNOWN"}
return PARSERS[template].parse(text)