This commit is contained in:
2026-01-11 17:52:26 +01:00
parent 860782c0de
commit a55a956b61

282
app.py
View File

@@ -426,7 +426,6 @@ class PICLParser:
r["weights"]["gain_loss_percent"]=to_float(safe_search(r"\(\s*([0-9.,]+)\s*o/o\s*\)",text)) r["weights"]["gain_loss_percent"]=to_float(safe_search(r"\(\s*([0-9.,]+)\s*o/o\s*\)",text))
return r return r
# Configure root logger explicitly # Configure root logger explicitly
root = logging.getLogger() root = logging.getLogger()
root.setLevel(logging.INFO) root.setLevel(logging.INFO)
@@ -444,48 +443,263 @@ predictor = ocr_predictor(pretrained=True)
logger.info("Models loaded successfully.") logger.info("Models loaded successfully.")
# ============================= import io
# 🧠 Smart OCR import re
# ============================= from datetime import datetime
from typing import Dict, Any
import pytesseract
from pdf2image import convert_from_bytes
from PIL import Image
from PyPDF2 import PdfReader
import json
def parse_cotton_report(ocr_text: str) -> Dict[str, Any]:
"""
Parse structured data from cotton landing report OCR text
"""
result = {
"lab": "ALFRED H KNIGHT",
"report": {"reference": None, "file_no": None, "date": None},
"contract": {"contract_no": None, "invoice_no": None, "lc_no": None,
"origin": None, "commodity": None},
"parties": {"seller": None, "buyer": None, "carrier": None},
"shipment": {
"vessel": None, "bl_no": None, "port_loading": None,
"port_destination": None, "arrival_date": None,
"weighing_place": None, "weighing_method": None,
"bales": None
},
"weights": {
"gross_landed_kg": None, "tare_kg": None,
"net_landed_kg": None, "invoice_net_kg": None,
"gain_loss_kg": None, "gain_loss_percent": None
}
}
# Clean the text
lines = ocr_text.split('\n')
clean_lines = [line.strip() for line in lines if line.strip()]
# Extract using patterns
text = ocr_text.lower()
# 1. Extract report reference and file number
ref_match = re.search(r'client reference:\s*([^\n]+)', ocr_text, re.IGNORECASE)
if ref_match:
result["report"]["reference"] = ref_match.group(1).strip()
# Try to get file number from AHK reference
ahk_match = re.search(r'ahk s/(\d+)/', ocr_text, re.IGNORECASE)
if ahk_match:
result["report"]["file_no"] = ahk_match.group(1)
# 2. Extract dates
date_match = re.search(r'printed date:\s*(\d{1,2}-[a-z]+-\d{4})', text, re.IGNORECASE)
if date_match:
result["report"]["date"] = date_match.group(1).title()
# 3. Extract contract information
# Origin/Growth
growth_match = re.search(r'growth\s*:?\s*([^\n:]+(?:raw cotton)?)', ocr_text, re.IGNORECASE)
if growth_match:
origin = growth_match.group(1).strip()
result["contract"]["origin"] = origin
result["contract"]["commodity"] = "COTTON"
# Invoice number from reference
if result["report"]["reference"]:
inv_match = re.search(r'inv\s*(\d+)', result["report"]["reference"], re.IGNORECASE)
if inv_match:
result["contract"]["invoice_no"] = inv_match.group(1)
# 4. Extract parties
# Seller
seller_match = re.search(r'client\s*:?\s*([^\n]+)', ocr_text, re.IGNORECASE)
if seller_match:
# Skip the "Client" label if present
seller_text = seller_match.group(1).strip()
if not seller_text.lower().startswith('client'):
result["parties"]["seller"] = seller_text
# Buyer
buyer_match = re.search(r'buyer\s*:?\s*([^\n]+)', ocr_text, re.IGNORECASE)
if buyer_match:
buyer_text = buyer_match.group(1).strip()
if not buyer_text.lower().startswith('buyer'):
result["parties"]["buyer"] = buyer_text
# 5. Extract shipment details
# Vessel
vessel_match = re.search(r'vessel\s*:?\s*([^\n]+)', ocr_text, re.IGNORECASE)
if vessel_match:
vessel_text = vessel_match.group(1).strip()
if not vessel_text.lower().startswith('vessel'):
result["shipment"]["vessel"] = vessel_text
# B/L Number
bl_match = re.search(r'b/l no\.?\s*:?\s*([^\n]+)', ocr_text, re.IGNORECASE)
if bl_match:
bl_text = bl_match.group(1).strip()
result["shipment"]["bl_no"] = bl_text
# Destination
dest_match = re.search(r'destination\s*:?\s*([^\n]+)', ocr_text, re.IGNORECASE)
if dest_match:
dest_text = dest_match.group(1).strip()
if not dest_text.lower().startswith('destination'):
result["shipment"]["port_destination"] = dest_text
# Arrival Date
arrival_match = re.search(r'arrival date\s*:?\s*(\d{1,2}-[a-z]+-\d{4})', text, re.IGNORECASE)
if arrival_match:
result["shipment"]["arrival_date"] = arrival_match.group(1).title()
# Weighing method
weigh_match = re.search(r'weighing method\s*:?\s*([^\n]+)', ocr_text, re.IGNORECASE)
if weigh_match:
method_text = weigh_match.group(1).strip()
if not method_text.lower().startswith('weighing'):
result["shipment"]["weighing_method"] = method_text
# Bales count
bales_match = re.search(r'total bales\s*:?\s*(\d+)', ocr_text, re.IGNORECASE)
if bales_match:
result["shipment"]["bales"] = int(bales_match.group(1))
# 6. Extract weights (critical section)
# Gross Landed Weight
gross_match = re.search(r'gross\s*:?\s*([\d,\.]+)\s*kg', ocr_text)
if gross_match:
# We need the second occurrence (landed weight)
all_gross = re.findall(r'gross\s*:?\s*([\d,\.]+)\s*kg', ocr_text)
if len(all_gross) >= 2:
result["weights"]["gross_landed_kg"] = float(all_gross[1].replace(',', ''))
# Tare weight (should be same in both)
tare_match = re.search(r'tare\s*:?\s*([\d,\.]+)\s*kg', ocr_text)
if tare_match:
result["weights"]["tare_kg"] = float(tare_match.group(1).replace(',', ''))
# Net weights
net_matches = re.findall(r'net\s*:?\s*([\d,\.]+)\s*kg', ocr_text)
if len(net_matches) >= 2:
result["weights"]["invoice_net_kg"] = float(net_matches[0].replace(',', ''))
result["weights"]["net_landed_kg"] = float(net_matches[1].replace(',', ''))
# Loss/Gain
loss_match = re.search(r'loss\s*:?\s*[-]?\s*([\d,\.]+)\s*kg', ocr_text, re.IGNORECASE)
if loss_match:
loss_value = float(loss_match.group(1).replace(',', ''))
# Make it negative if not already indicated
if '-' not in loss_match.group(0) and '' not in loss_match.group(0):
loss_value = -loss_value
result["weights"]["gain_loss_kg"] = loss_value
# Percentage
percent_match = re.search(r'percentage\s*:?\s*[-]?\s*([\d,\.]+)%', ocr_text, re.IGNORECASE)
if percent_match:
percent_value = float(percent_match.group(1).replace(',', ''))
if '-' not in percent_match.group(0) and '' not in percent_match.group(0):
percent_value = -percent_value
result["weights"]["gain_loss_percent"] = percent_value
return result
@app.post("/ocr") @app.post("/ocr")
async def ocr(file: UploadFile): async def ocr(file: UploadFile):
logger.info(f"Received OCR request: {file.filename}") """
Enhanced OCR endpoint that returns structured data
"""
logger.info(f"Received structured OCR request: {file.filename}")
try: try:
file_data = await file.read() file_data = await file.read()
ext = file.filename.lower() ext = file.filename.lower()
# --------- PDF with native text --------- ocr_text = ""
# Process PDF
if ext.endswith(".pdf"): if ext.endswith(".pdf"):
logger.info("PDF detected → Extracting native text first") # Try native text extraction first
reader = PdfReader(io.BytesIO(file_data)) reader = PdfReader(io.BytesIO(file_data))
direct_text = "".join( direct_text = "".join(page.extract_text() or "" for page in reader.pages)
page.extract_text() or "" for page in reader.pages
)
if direct_text.strip(): if direct_text.strip():
logger.info("Native PDF text found → No OCR needed") logger.info("Using native PDF text")
return {"ocr_text": direct_text} ocr_text = direct_text
else:
# -------- Fallback: scanned PDF OCR -------- # Fallback to OCR
logger.info("No native text → PDF treated as scanned → OCR") logger.info("Using OCR for scanned PDF")
from pdf2image import convert_from_bytes images = convert_from_bytes(file_data)
images = convert_from_bytes(file_data) for i, img in enumerate(images):
text = "" logger.info(f"OCR page {i+1}/{len(images)}")
for i, img in enumerate(images): ocr_text += pytesseract.image_to_string(img) + "\n"
logger.info(f"OCR page {i+1}/{len(images)}") else:
text += pytesseract.image_to_string(img) + "\n" # Process image
img = Image.open(io.BytesIO(file_data))
return {"ocr_text": text} ocr_text = pytesseract.image_to_string(img)
# --------- Image file OCR --------- # Parse structured data
logger.info("Image detected → Running OCR") structured_data = parse_cotton_report(ocr_text)
img = Image.open(io.BytesIO(file_data))
text = pytesseract.image_to_string(img) return {
return {"ocr_text": text} "success": True,
"raw_text": ocr_text[:1000] + "..." if len(ocr_text) > 1000 else ocr_text,
"structured_data": structured_data,
"json": json.dumps(structured_data, indent=2, ensure_ascii=False)
}
except Exception as e: except Exception as e:
logger.error(f"OCR failed: {e}", exc_info=True) logger.error(f"Structured OCR failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) return {
"success": False,
"error": str(e),
"raw_text": "",
"structured_data": {}
}
# =============================
# 🧠 Smart OCR
# =============================
# @app.post("/ocr")
# async def ocr(file: UploadFile):
# logger.info(f"Received OCR request: {file.filename}")
# try:
# file_data = await file.read()
# ext = file.filename.lower()
# # --------- PDF with native text ---------
# if ext.endswith(".pdf"):
# logger.info("PDF detected → Extracting native text first")
# reader = PdfReader(io.BytesIO(file_data))
# direct_text = "".join(
# page.extract_text() or "" for page in reader.pages
# )
# if direct_text.strip():
# logger.info("Native PDF text found → No OCR needed")
# return {"ocr_text": direct_text}
# # -------- Fallback: scanned PDF OCR --------
# logger.info("No native text → PDF treated as scanned → OCR")
# from pdf2image import convert_from_bytes
# images = convert_from_bytes(file_data)
# text = ""
# for i, img in enumerate(images):
# logger.info(f"OCR page {i+1}/{len(images)}")
# text += pytesseract.image_to_string(img) + "\n"
# return {"ocr_text": text}
# # --------- Image file OCR ---------
# logger.info("Image detected → Running OCR")
# img = Image.open(io.BytesIO(file_data))
# text = pytesseract.image_to_string(img)
# return {"ocr_text": text}
# except Exception as e:
# logger.error(f"OCR failed: {e}", exc_info=True)
# raise HTTPException(status_code=500, detail=str(e))
# ============================= # =============================
# 🧱 Structure / Layout # 🧱 Structure / Layout