Initial import

This commit is contained in:
root
2025-12-28 16:48:23 +00:00
commit 377ff3a613
4 changed files with 308 additions and 0 deletions

259
app.py Normal file
View File

@@ -0,0 +1,259 @@
from fastapi import FastAPI, UploadFile, HTTPException, Body
from PIL import Image
import pytesseract
from doctr.models import ocr_predictor
from doctr.io import DocumentFile
from PyPDF2 import PdfReader
import camelot
import spacy
import logging
import io
from logging.handlers import RotatingFileHandler
import re
LOG_PATH = "/var/log/automation-service.log"
file_handler = RotatingFileHandler(
LOG_PATH,
maxBytes=10*1024*1024,
backupCount=5,
encoding="utf-8"
)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(message)s"
))
# Configure root logger explicitly
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(file_handler)
root.addHandler(logging.StreamHandler())
# Use root logger for your app
logger = logging.getLogger(__name__)
app = FastAPI()
logger.info("Loading models...")
nlp = spacy.load("en_core_web_sm")
predictor = ocr_predictor(pretrained=True)
logger.info("Models loaded successfully.")
# =============================
# 🧠 Smart OCR
# =============================
@app.post("/ocr")
async def ocr(file: UploadFile):
logger.info(f"Received OCR request: {file.filename}")
try:
file_data = await file.read()
ext = file.filename.lower()
# --------- PDF with native text ---------
if ext.endswith(".pdf"):
logger.info("PDF detected → Extracting native text first")
reader = PdfReader(io.BytesIO(file_data))
direct_text = "".join(
page.extract_text() or "" for page in reader.pages
)
if direct_text.strip():
logger.info("Native PDF text found → No OCR needed")
return {"ocr_text": direct_text}
# -------- Fallback: scanned PDF OCR --------
logger.info("No native text → PDF treated as scanned → OCR")
from pdf2image import convert_from_bytes
images = convert_from_bytes(file_data)
text = ""
for i, img in enumerate(images):
logger.info(f"OCR page {i+1}/{len(images)}")
text += pytesseract.image_to_string(img) + "\n"
return {"ocr_text": text}
# --------- Image file OCR ---------
logger.info("Image detected → Running OCR")
img = Image.open(io.BytesIO(file_data))
text = pytesseract.image_to_string(img)
return {"ocr_text": text}
except Exception as e:
logger.error(f"OCR failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# =============================
# 🧱 Structure / Layout
# =============================
@app.post("/structure")
async def structure(file: UploadFile):
logger.info(f"Received structure request: {file.filename}")
try:
file_data = await file.read()
ext = file.filename.lower()
if ext.endswith(".pdf"):
doc = DocumentFile.from_pdf(file_data)
logger.info(f"Structure prediction on PDF ({len(doc)} pages)")
else:
img = Image.open(io.BytesIO(file_data)).convert("RGB")
doc = DocumentFile.from_images([img])
logger.info("Structure prediction on image")
res = predictor(doc)
return {"structure": str(res)}
except Exception as e:
logger.error(f"Structure extraction failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# =============================
# 📊 Tables extraction (PDF only)
# =============================
@app.post("/tables")
async def tables(file: UploadFile):
logger.info(f"Received table extraction request: {file.filename}")
try:
file_data = await file.read()
buffer = io.BytesIO(file_data)
tables = camelot.read_pdf(buffer)
logger.info(f"Found {len(tables)} tables")
return {"tables": [t.df.to_dict() for t in tables]}
except Exception as e:
logger.error(f"Table extraction failed: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
def safe_search(pattern, text, default=None, group_index=1, context=""):
"""Recherche sécurisée avec logging en cas d'absence de correspondance."""
m = re.search(pattern, text, re.I | re.S)
if not m:
logger.warning("Pattern not found for %s: %s", context, pattern)
return default
try:
return m.group(group_index).strip()
except IndexError:
logger.warning("Group index %d not found for %s: %s", group_index, context, pattern)
return default
def section(text, start, end=None):
"""Extract a block of text between two headings, safely."""
pattern_start = re.escape(start)
if end:
pattern_end = re.escape(end)
reg = re.compile(pattern_start + r"(.*?)" + pattern_end, re.S | re.I)
else:
reg = re.compile(pattern_start + r"(.*)", re.S | re.I)
m = reg.search(text)
if not m:
logger.warning("Section not found: start='%s', end='%s'", start, end)
return ""
return m.group(1).strip()
def extract_field(text, label, default=None):
"""Extract a line of the form 'Label: value', safely."""
pattern = rf"{re.escape(label)}\s*:?[\s]+([^\n]+)"
return safe_search(pattern, text, default=default, context=f"field '{label}'")
def extract_report_metadata(text):
logger.info("Starting metadata extraction, text length=%d", len(text))
try:
# ----------- SECTIONS -----------
order_details = section(text, "Order details", "Weights")
invoice_section = section(text, "INVOICE WEIGHTS", "Bales Weighed")
landed_section = section(text, "Bales Weighed", "Outturn")
loss_section = section(text, "LOSS", "Invoice average")
avg_section = section(text, "Invoice average", "Comments")
signature_block = section(text, "Signed on")
# ----------- TOP INFO -----------
top_info = {
"produced_on": extract_field(text, "Produced On"),
"printed_date": extract_field(text, "Printed Date"),
"client_reference": extract_field(text, "Client Reference"),
"report_number": safe_search(r"(AHK\S+)", text, default="", context="report_number", group_index=1),
}
# ----------- ORDER DETAILS -----------
parties = {
"client": extract_field(order_details, "Client"),
"client_ref_no": extract_field(order_details, "Client Ref No"),
"buyer": extract_field(order_details, "Buyer"),
"destination": extract_field(order_details, "Destination"),
}
shipment = {
"total_bales": extract_field(order_details, "Total Bales"),
"vessel": extract_field(order_details, "Vessel"),
"voyage_no": extract_field(order_details, "Voy. No"),
"bl_no": extract_field(order_details, "B/L No"),
"bl_date": extract_field(order_details, "B/L Date"),
"growth": extract_field(order_details, "Growth"),
"arrival_date": extract_field(order_details, "Arrival Date"),
"first_weighing_date": extract_field(order_details, "First date of weighing"),
"last_weighing_date": extract_field(order_details, "Last Date of Weighing"),
"weighing_method": extract_field(order_details, "Weighing method"),
"tare_basis": extract_field(order_details, "Tare"),
}
# ----------- INVOICE SECTION -----------
invoice = {
"bales": extract_field(invoice_section, "Bales"),
"gross": extract_field(invoice_section, "Gross"),
"tare": extract_field(invoice_section, "Tare"),
"net": extract_field(invoice_section, "Net"),
}
# ----------- LANDED SECTION -----------
landed = {
"bales": extract_field(landed_section, "Bales"),
"gross": extract_field(landed_section, "Gross"),
"tare": extract_field(landed_section, "Tare"),
"net": extract_field(landed_section, "Net"),
}
# ----------- LOSS SECTION -----------
loss = {
"kg": extract_field(loss_section, "kg"),
"lb": extract_field(loss_section, "lb"),
"percent": extract_field(loss_section, "Percentage"),
}
# ----------- AVERAGES SECTION -----------
averages = {
"invoice_gross_per_bale": extract_field(avg_section, "Invoice average"),
"landed_gross_per_bale": extract_field(avg_section, "Landed average"),
}
# ----------- SIGNATURE -----------
signature = {
"signed_on": extract_field(signature_block, "Signed on"),
"signed_by": safe_search(r"\n([A-Za-z ]+)\nClient Services", signature_block, default="", context="signed_by"),
"role": "Client Services Coordinator",
"company": "Alfred H. Knight International Limited"
}
logger.info("Metadata extraction completed successfully")
return {
"report": top_info,
"parties": parties,
"shipment": shipment,
"weights": {
"invoice": invoice,
"landed": landed,
"loss": loss,
"averages": averages
},
"signature": signature
}
except Exception as e:
logger.exception("Unexpected error during metadata extraction")
raise HTTPException(status_code=500, detail=f"Metadata extraction failed: {e}")
@app.post("/metadata")
async def metadata(text: str = Body(..., embed=True)):
return extract_report_metadata(text)