Files
tradon/modules/purchase_trade/incoming.py
2025-12-26 13:11:43 +00:00

566 lines
19 KiB
Python

from trytond.model import ModelSQL, ModelView, Workflow, fields
from trytond.pool import Pool
from trytond.transaction import Transaction
from trytond.pyson import Bool, Eval, Id, If, PYSONEncoder
from trytond.wizard import Wizard, StateView, StateTransition, Button
from datetime import datetime
import re
class LCIncoming(ModelSQL, ModelView, Workflow):
'LC Incoming'
__name__ = 'lc.letter.incoming'
name = fields.Char('Number')
type = fields.Selection([
('documentary', 'Documentary LC'),
('standby', 'Standby LC')
], 'Type', required=True)
sale = fields.Many2One('sale.sale', 'Sale')
company = fields.Many2One('company.company', 'Company')
applicant = fields.Many2One('party.party', 'Applicant')
beneficiary = fields.Many2One('party.party', 'Beneficiary')
# Banques
issuing_bank = fields.Many2One('party.party', 'Issuing Bank')
advising_bank = fields.Many2One('party.party', 'Advising Bank')
confirming_bank = fields.Many2One('party.party', 'Confirming Bank')
reimbursing_bank = fields.Many2One('party.party', 'Reimbursing Bank')
# Montants et conditions
amount = fields.Numeric('Amount', digits=(16, 2))
currency = fields.Many2One('currency.currency', 'Currency')
tolerance_plus = fields.Numeric('Tolerance + %', digits=(6, 2))
tolerance_minus = fields.Numeric('Tolerance - %', digits=(6, 2))
# Conditions de livraison
incoterm = fields.Many2One('incoterm.incoterm', 'Incoterm')
port_of_loading = fields.Many2One('stock.location','Port of Loading')
port_of_discharge = fields.Many2One('stock.location','Port of Discharge')
final_destination = fields.Many2One('stock.location','Final Destination')
partial_shipment = fields.Selection([
(None, ''),
('allowed', 'Allowed'),
('not_allowed', 'Not Allowed')
], 'Partial Shipment')
transhipment = fields.Selection([
(None, ''),
('allowed', 'Allowed'),
('not_allowed', 'Not Allowed')
], 'Transhipment')
# Dates critiques
latest_shipment_date = fields.Date('Latest Shipment Date')
issue_date = fields.Date('Issue Date')
expiry_date = fields.Date('Expiry Date')
expiry_place = fields.Char('Expiry Place')
presentation_days = fields.Integer('Presentation Days')
# Règles et conditions
ruleset = fields.Selection([
(None, ''),
('ucp600', 'UCP 600'),
('isp98', 'ISP 98'),
('urdg758', 'URDG 758')
], 'Ruleset')
required_documents = fields.Many2Many(
'contract.document.type', 'lc_in', 'doc_type', 'Required Documents')
# Workflow principal
state = fields.Selection([
(None, ''),
('draft', 'Draft'),
('submitted', 'Submitted'),
('approved', 'Approved'),
('cancelled', 'Cancelled')
], 'State', readonly=True)
version = fields.Integer('Version', readonly=True)
# Documents et pièces jointes
attachments = fields.One2Many('ir.attachment', 'resource', 'Attachments',
domain=[('resource', '=', Eval('id'))], depends=['id'])
# Champs techniques
swift_message = fields.Text('SWIFT Message')
swift_type = fields.Char('SWIFT Type')
bank_reference = fields.Char('Bank Reference')
our_reference = fields.Char('Our Reference')
# Champs spécifiques Vente
receiving_bank = fields.Many2One('party.party', 'Receiving Bank')
confirming_bank_required = fields.Boolean('Confirmation Required')
confirmation_cost = fields.Numeric('Confirmation Cost', digits=(16, 2))
# Analyse de la LC reçue
risk_level = fields.Selection([
(None, ''),
('low', 'Low Risk'),
('medium', 'Medium Risk'),
('high', 'High Risk')
], 'Risk Level')
conditions_analysis = fields.Text('Conditions Analysis')
discrepancies_found = fields.One2Many('lc.discrepancy', 'lc', 'Discrepancies Found')
# Préparation documents
documents_prepared = fields.One2Many('lc.document.prepared', 'lc', 'Documents Prepared')
presentation_date = fields.Date('Presentation Date')
presentation_bank = fields.Many2One('party.party', 'Presentation Bank')
# Résultat présentation
acceptance_date = fields.Date('Acceptance Date')
payment_date = fields.Date('Payment Date')
refusal_reason = fields.Text('Refusal Reason')
swift_file = fields.Many2One('document.incoming',"Swift file")
swift_execute = fields.Boolean("Import")
swift_text = fields.Text("Message")
@classmethod
def __setup__(cls):
super(LCIncoming, cls).__setup__()
cls._transitions = set((
('draft', 'submitted'),
('submitted', 'approved'),
('draft', 'cancelled'),
('submitted', 'cancelled'),
('cancelled', 'draft'),
))
cls._buttons.update({
'cancel': {
'invisible': Eval('state').in_(['cancelled', 'approved']),
'depends': ['state'],
},
'draft': {
'invisible': Eval('state') != 'cancelled',
'depends': ['state'],
},
'submit': {
'invisible': Eval('state') != 'draft',
'depends': ['state'],
},
'approve': {
'invisible': Eval('state') != 'submitted',
'depends': ['state'],
},
})
def import_swift(self, text):
"""
Instance method: parse the text and return a dict of {field: value}
NE PAS écrire ici. Retourne seulement les valeurs à écrire.
"""
extracted = self._parse_swift_mt700(text)
return {
field: value
for field, value in extracted.items()
if hasattr(self, field) and value is not None and value != ''
}
@classmethod
def _process_swift_for_record(cls, lc, initial_vals=None):
"""
Traite un enregistrement unique : récupère swift_text si nécessaire,
appelle import_swift et applique les writes via super().write
(pour éviter de ré-appeler notre override write).
`initial_vals` : dict des valeurs passées lors du create/write initial
(peut être None).
"""
# Vérifier si on doit exécuter
should_execute = False
if initial_vals and 'swift_execute' in initial_vals:
should_execute = bool(initial_vals.get('swift_execute'))
else:
should_execute = bool(lc.swift_execute)
if not should_execute:
return
update_vals = {}
# Si un fichier a été fourni et que swift_text n'est pas déjà présent,
# remplir swift_text depuis le fichier
if lc.swift_file and not lc.swift_text:
try:
update_vals['swift_text'] = lc.swift_file.data.decode('utf-8', errors='ignore')
except Exception:
# Si décodage échoue, on peut ignorer ou logguer; ici on ignore
pass
# Si on a du texte (soit déjà soit venant du fichier), extraire
text_to_parse = update_vals.get('swift_text') or lc.swift_text
if text_to_parse:
extracted = lc.import_swift(text_to_parse)
update_vals.update(extracted)
# Toujours désactiver le flag et passer en submitted
update_vals['swift_execute'] = False
update_vals['state'] = 'submitted'
# Appliquer les modifications via la méthode parente pour éviter récursion
if update_vals:
super(LCIncoming, cls).write([lc], update_vals)
@classmethod
def create(cls, vals_list):
# Créer d'abord les enregistrements
lcs = super(LCIncoming, cls).create(vals_list)
# Pour chaque couple (record, vals) => traiter si swift_execute demandé
for lc, vals in zip(lcs, vals_list):
cls._process_swift_for_record(lc, initial_vals=vals)
return lcs
@classmethod
def write(cls, lcs, vals):
# Appeler le parent pour appliquer vals initiaux
super(LCIncoming, cls).write(lcs, vals)
# Puis traiter chaque record si nécessaire.
# On passe `vals` pour savoir si swift_execute a été fourni dans l'appel initial.
for lc in lcs:
cls._process_swift_for_record(lc, initial_vals=vals)
@staticmethod
def default_state():
return 'draft'
@staticmethod
def default_version():
return 1
def get_rec_name(self, name):
if self.name:
return f"{self.name}"
return f"LC - {self.create_date}"
@classmethod
def search_rec_name(cls, name, clause):
return ['OR',
('name',) + tuple(clause[1:]),
]
@classmethod
@ModelView.button
@Workflow.transition('cancelled')
def cancel(cls, lcs):
pass
@classmethod
@ModelView.button
@Workflow.transition('draft')
def draft(cls, lcs):
pass
@classmethod
@ModelView.button
@Workflow.transition('submitted')
def submit(cls, lcs):
cls.write(lcs, {'state': 'submitted'})
@classmethod
@ModelView.button
@Workflow.transition('approved')
def approve(cls, lcs):
for lc in lcs:
cls.write([lc], {'state': 'approved'})
@classmethod
def create_from_sale(cls, sale_id):
"""Méthode de base pour création depuis vente"""
Sale = Pool().get('sale.sale')
sale = Sale(sale_id)
return {
'sale': sale.id,
'company': sale.company.id,
'applicant': sale.party.id,
'beneficiary': sale.company.party.id,
'currency': sale.currency.id if sale.currency else None,
'port_of_loading': sale.from_location.id,
'port_of_discharge': sale.to_location.id,
'state': 'draft',
'type': 'documentary',
'risk_level': 'medium',
}
def _parse_swift_mt700(self, swift_text):
"""Parse le message SWIFT MT700"""
data = {}
# Ne pas appeler les extracteurs ici — passer des callables
patterns = {
'name': r':20:(.+)',
'issue_date': r':31C:(\d{6})',
'expiry_date': r':31D:(\d{6})',
'expiry_place': r':31D:\d{6}\s*(.+)',
# callables qui prendront swift_text en argument
'applicant': lambda txt: self._extract_from_swift(txt, ':50:'),
'beneficiary': lambda txt: self._extract_from_swift(txt, ':59:'),
'amount': r':32B:([A-Z]{3})\s*([\d,.\s]+)',
'issuing_bank': lambda txt: self._extract_from_swift(txt, ':52A:'),
'advising_bank': lambda txt: self._extract_from_swift(txt, ':53A:'),
'partial_shipment': r':43P:(.+)',
'transhipment': r':43T:(.+)',
'port_of_loading': lambda txt: self._extract_from_swift(txt, ':53A:'),
'port_of_discharge': lambda txt: self._extract_from_swift(txt, ':53A:'),
'latest_shipment_date': r':44C:(\d{6})',
'presentation_days': r':48:(\d+)',
}
for field, pattern in patterns.items():
if callable(pattern):
# appelle le callable avec le texte swift
try:
value = pattern(swift_text)
except Exception:
value = None
# n'ajouter que si on a une valeur non vide
if value:
data[field] = value.strip() if isinstance(value, str) else value
else:
match = re.search(pattern, swift_text, re.MULTILINE | re.IGNORECASE)
if match:
if field == 'amount':
# group(1) = currency, group(2) = montant
currency = match.group(1)
amount_str = match.group(2)
# normaliser le montant: enlever espaces et remplacer la virgule décimale par point
cleaned = amount_str.replace(' ', '').replace(',', '.')
try:
data['amount'] = float(cleaned)
data['currency'] = self._find_currency(currency)
except ValueError:
# si parse échoue, stocker en string pour debug
data['amount_raw'] = amount_str
elif field in ['issue_date', 'expiry_date', 'latest_shipment_date']:
date_str = match.group(1)
if len(date_str) == 6:
# SWIFT YYMMDD -> interpréter comme 20YY (ou adapter si besoin)
year = int(date_str[0:2]) + 2000
month = int(date_str[2:4])
day = int(date_str[4:6])
try:
data[field] = datetime(year, month, day).date()
except ValueError:
# date invalide : ne rien faire ou logger
pass
else:
# ici on sait qu'il y a un groupe capturant (1)
data[field] = match.group(1).strip()
return data
def _extract_from_swift(self, swift_text, field_tag):
"""Extrait les informations de partie depuis SWIFT"""
pattern = field_tag + r'\s*(.+)'
match = re.search(pattern, swift_text, re.MULTILINE | re.IGNORECASE)
if match:
party_name = match.group(1).strip().split(',')[0].strip()
return self._find_record_from_text(party_name)
return None
def _extract_name(self, text):
if not text:
return None
# On prend avant la première virgule
return text.split(',')[0].strip()
def _find_record_from_text(self, record_text):
Party = Pool().get('party.party')
Location = Pool().get('stock.location')
if not record_text:
return None
# Extraire le nom
name = self._extract_name(record_text)
if not name:
return None
# 1) Exact match
parties = Party.search([
('name', '=', name),
])
if parties:
return parties[0]
# 2) Match insensible aux accents et majuscules
parties = Party.search([
('name', 'ilike', name),
])
if parties:
return parties[0]
# 3) Match partiel (ex: "SAFTCO" dans "SAFTCO SA")
keyword = name.split()[0] # Premier mot du nom
parties = Party.search([
('name', 'ilike', f'%{keyword}%'),
])
if parties:
return parties[0]
locations = Location.search([
('name', '=', name),
])
if locations:
return locations[0]
# Aucun match trouvé
return None
def _find_currency(self, currency_code):
"""Trouve la devise par son code"""
Currency = Pool().get('currency.currency')
currencies = Currency.search([('code', '=', currency_code)], limit=1)
return currencies[0].id if currencies else None
def analyze_conditions(self):
"""Analyse automatique des conditions de la LC"""
analysis = []
risks = []
lc = self
# Vérification dates
if lc.expiry_date and lc.expiry_date < datetime.now().date():
risks.append("LC expirée")
if lc.latest_shipment_date and lc.latest_shipment_date < datetime.now().date():
risks.append("Date de shipment dépassée")
# Vérification documents
if not lc.required_documents:
risks.append("Aucun document spécifié")
# Vérification banques
if not lc.issuing_bank:
risks.append("Banque émettrice non spécifiée")
# Détermination niveau de risque
if len(risks) == 0:
lc.risk_level = 'low'
elif len(risks) <= 2:
lc.risk_level = 'medium'
else:
lc.risk_level = 'high'
lc.conditions_analysis = "\n".join(analysis + ["RISKS:"] + risks)
lc.save()
return lc.risk_level
def prepare_documents_checklist(self):
"""Prépare la checklist des documents à produire"""
lc = self
LCDocumentPrepared = Pool().get('lc.document.prepared')
# Crée les entrées pour chaque document requis
documents_to_prepare = []
if lc.required_documents:
for doc_type in lc.required_documents:
documents_to_prepare.append({
'lc': lc.id,
'document_type': doc_type.id,
'required': True,
'status': 'pending'
})
if documents_to_prepare:
LCDocumentPrepared.create(documents_to_prepare)
# lc.state = 'under_review'
lc.save()
return len(documents_to_prepare)
class ImportSwiftStart(ModelView):
'Import SWIFT Start'
__name__ = 'lc.import_swift.start'
file_ = fields.Binary('SWIFT MT700 file', required=True)
filename = fields.Char('Filename')
class ImportSwift(Wizard):
'Import SWIFT'
__name__ = 'lc.import_swift'
start = StateView(
'lc.import_swift.start',
'purchase_trade.import_swift_start_view_form',
[
Button('Cancel', 'end', 'tryton-cancel'),
Button('Import', 'import_', 'tryton-ok', default=True),
]
)
import_ = StateTransition()
def transition_import_(self):
LC = Pool().get('lc.letter.incoming')
active_id = Transaction().context.get('active_id')
lc = LC(active_id)
content = self.start.file_
text = content.decode('utf-8')
extracted = lc._parse_swift_mt700(text)
update_vals = {}
for field, value in extracted.items():
if hasattr(lc, field) and value:
update_vals[field] = value
# Met à jour tous les champs via write()
if update_vals:
LC.write([lc], update_vals)
# Passe l'état via write()
LC.write([lc], {'state': 'submitted'})
return 'end'
def end(self):
# Récupérer active_id du contexte
ctx = Transaction().context
active_id = ctx.get('active_id') or (ctx.get('active_ids') and ctx.get('active_ids')[0])
if not active_id:
return 'reload'
return {
'type': 'ir.actions.act_window',
'res_model': 'lc.letter.incoming',
'res_id': active_id,
'view_mode': 'form',
'target': 'new',
}
class AnalyzeConditions(Wizard):
"Analyze Conditions"
__name__ = "lc.analyze.conditions"
start = StateTransition()
def transition_start(self):
self.records[0].analyze_conditions()
return 'end'
def end(self):
return 'reload'
class PrepareDocuments(Wizard):
"Prepare Documents"
__name__ = "lc.prepare.doc"
start = StateTransition()
def transition_start(self):
self.records[0].prepare_documents_checklist()
return 'end'
def end(self):
return 'reload'