from trytond.model import ModelSQL, ModelView, Workflow, fields from trytond.pool import Pool from trytond.transaction import Transaction from trytond.pyson import Bool, Eval, Id, If, PYSONEncoder from trytond.wizard import Wizard, StateView, StateTransition, Button from datetime import datetime import re class LCIncoming(ModelSQL, ModelView, Workflow): 'LC Incoming' __name__ = 'lc.letter.incoming' name = fields.Char('Number') type = fields.Selection([ ('documentary', 'Documentary LC'), ('standby', 'Standby LC') ], 'Type', required=True) sale = fields.Many2One('sale.sale', 'Sale') company = fields.Many2One('company.company', 'Company') applicant = fields.Many2One('party.party', 'Applicant') beneficiary = fields.Many2One('party.party', 'Beneficiary') # Banques issuing_bank = fields.Many2One('party.party', 'Issuing Bank') advising_bank = fields.Many2One('party.party', 'Advising Bank') confirming_bank = fields.Many2One('party.party', 'Confirming Bank') reimbursing_bank = fields.Many2One('party.party', 'Reimbursing Bank') # Montants et conditions amount = fields.Numeric('Amount', digits=(16, 2)) currency = fields.Many2One('currency.currency', 'Currency') tolerance_plus = fields.Numeric('Tolerance + %', digits=(6, 2)) tolerance_minus = fields.Numeric('Tolerance - %', digits=(6, 2)) # Conditions de livraison incoterm = fields.Many2One('incoterm.incoterm', 'Incoterm') port_of_loading = fields.Many2One('stock.location','Port of Loading') port_of_discharge = fields.Many2One('stock.location','Port of Discharge') final_destination = fields.Many2One('stock.location','Final Destination') partial_shipment = fields.Selection([ (None, ''), ('allowed', 'Allowed'), ('not_allowed', 'Not Allowed') ], 'Partial Shipment') transhipment = fields.Selection([ (None, ''), ('allowed', 'Allowed'), ('not_allowed', 'Not Allowed') ], 'Transhipment') # Dates critiques latest_shipment_date = fields.Date('Latest Shipment Date') issue_date = fields.Date('Issue Date') expiry_date = fields.Date('Expiry Date') expiry_place = fields.Char('Expiry Place') presentation_days = fields.Integer('Presentation Days') # Règles et conditions ruleset = fields.Selection([ (None, ''), ('ucp600', 'UCP 600'), ('isp98', 'ISP 98'), ('urdg758', 'URDG 758') ], 'Ruleset') required_documents = fields.Many2Many( 'contract.document.type', 'lc_in', 'doc_type', 'Required Documents') # Workflow principal state = fields.Selection([ (None, ''), ('draft', 'Draft'), ('submitted', 'Submitted'), ('approved', 'Approved'), ('cancelled', 'Cancelled') ], 'State', readonly=True) version = fields.Integer('Version', readonly=True) # Documents et pièces jointes attachments = fields.One2Many('ir.attachment', 'resource', 'Attachments', domain=[('resource', '=', Eval('id'))], depends=['id']) # Champs techniques swift_message = fields.Text('SWIFT Message') swift_type = fields.Char('SWIFT Type') bank_reference = fields.Char('Bank Reference') our_reference = fields.Char('Our Reference') # Champs spécifiques Vente receiving_bank = fields.Many2One('party.party', 'Receiving Bank') confirming_bank_required = fields.Boolean('Confirmation Required') confirmation_cost = fields.Numeric('Confirmation Cost', digits=(16, 2)) # Analyse de la LC reçue risk_level = fields.Selection([ (None, ''), ('low', 'Low Risk'), ('medium', 'Medium Risk'), ('high', 'High Risk') ], 'Risk Level') conditions_analysis = fields.Text('Conditions Analysis') discrepancies_found = fields.One2Many('lc.discrepancy', 'lc', 'Discrepancies Found') # Préparation documents documents_prepared = fields.One2Many('lc.document.prepared', 'lc', 'Documents Prepared') presentation_date = fields.Date('Presentation Date') presentation_bank = fields.Many2One('party.party', 'Presentation Bank') # Résultat présentation acceptance_date = fields.Date('Acceptance Date') payment_date = fields.Date('Payment Date') refusal_reason = fields.Text('Refusal Reason') swift_file = fields.Many2One('document.incoming',"Swift file") swift_execute = fields.Boolean("Import") swift_text = fields.Text("Message") @classmethod def __setup__(cls): super(LCIncoming, cls).__setup__() cls._transitions = set(( ('draft', 'submitted'), ('submitted', 'approved'), ('draft', 'cancelled'), ('submitted', 'cancelled'), ('cancelled', 'draft'), )) cls._buttons.update({ 'cancel': { 'invisible': Eval('state').in_(['cancelled', 'approved']), 'depends': ['state'], }, 'draft': { 'invisible': Eval('state') != 'cancelled', 'depends': ['state'], }, 'submit': { 'invisible': Eval('state') != 'draft', 'depends': ['state'], }, 'approve': { 'invisible': Eval('state') != 'submitted', 'depends': ['state'], }, }) def import_swift(self, text): """ Instance method: parse the text and return a dict of {field: value} NE PAS écrire ici. Retourne seulement les valeurs à écrire. """ extracted = self._parse_swift_mt700(text) return { field: value for field, value in extracted.items() if hasattr(self, field) and value is not None and value != '' } @classmethod def _process_swift_for_record(cls, lc, initial_vals=None): """ Traite un enregistrement unique : récupère swift_text si nécessaire, appelle import_swift et applique les writes via super().write (pour éviter de ré-appeler notre override write). `initial_vals` : dict des valeurs passées lors du create/write initial (peut être None). """ # Vérifier si on doit exécuter should_execute = False if initial_vals and 'swift_execute' in initial_vals: should_execute = bool(initial_vals.get('swift_execute')) else: should_execute = bool(lc.swift_execute) if not should_execute: return update_vals = {} # Si un fichier a été fourni et que swift_text n'est pas déjà présent, # remplir swift_text depuis le fichier if lc.swift_file and not lc.swift_text: try: update_vals['swift_text'] = lc.swift_file.data.decode('utf-8', errors='ignore') except Exception: # Si décodage échoue, on peut ignorer ou logguer; ici on ignore pass # Si on a du texte (soit déjà soit venant du fichier), extraire text_to_parse = update_vals.get('swift_text') or lc.swift_text if text_to_parse: extracted = lc.import_swift(text_to_parse) update_vals.update(extracted) # Toujours désactiver le flag et passer en submitted update_vals['swift_execute'] = False update_vals['state'] = 'submitted' # Appliquer les modifications via la méthode parente pour éviter récursion if update_vals: super(LCIncoming, cls).write([lc], update_vals) @classmethod def create(cls, vals_list): # Créer d'abord les enregistrements lcs = super(LCIncoming, cls).create(vals_list) # Pour chaque couple (record, vals) => traiter si swift_execute demandé for lc, vals in zip(lcs, vals_list): cls._process_swift_for_record(lc, initial_vals=vals) return lcs @classmethod def write(cls, lcs, vals): # Appeler le parent pour appliquer vals initiaux super(LCIncoming, cls).write(lcs, vals) # Puis traiter chaque record si nécessaire. # On passe `vals` pour savoir si swift_execute a été fourni dans l'appel initial. for lc in lcs: cls._process_swift_for_record(lc, initial_vals=vals) @staticmethod def default_state(): return 'draft' @staticmethod def default_version(): return 1 def get_rec_name(self, name): if self.name: return f"{self.name}" return f"LC - {self.create_date}" @classmethod def search_rec_name(cls, name, clause): return ['OR', ('name',) + tuple(clause[1:]), ] @classmethod @ModelView.button @Workflow.transition('cancelled') def cancel(cls, lcs): pass @classmethod @ModelView.button @Workflow.transition('draft') def draft(cls, lcs): pass @classmethod @ModelView.button @Workflow.transition('submitted') def submit(cls, lcs): cls.write(lcs, {'state': 'submitted'}) @classmethod @ModelView.button @Workflow.transition('approved') def approve(cls, lcs): for lc in lcs: cls.write([lc], {'state': 'approved'}) @classmethod def create_from_sale(cls, sale_id): """Méthode de base pour création depuis vente""" Sale = Pool().get('sale.sale') sale = Sale(sale_id) return { 'sale': sale.id, 'company': sale.company.id, 'applicant': sale.party.id, 'beneficiary': sale.company.party.id, 'currency': sale.currency.id if sale.currency else None, 'port_of_loading': sale.from_location.id, 'port_of_discharge': sale.to_location.id, 'state': 'draft', 'type': 'documentary', 'risk_level': 'medium', } def _parse_swift_mt700(self, swift_text): """Parse le message SWIFT MT700""" data = {} # Ne pas appeler les extracteurs ici — passer des callables patterns = { 'name': r':20:(.+)', 'issue_date': r':31C:(\d{6})', 'expiry_date': r':31D:(\d{6})', 'expiry_place': r':31D:\d{6}\s*(.+)', # callables qui prendront swift_text en argument 'applicant': lambda txt: self._extract_from_swift(txt, ':50:'), 'beneficiary': lambda txt: self._extract_from_swift(txt, ':59:'), 'amount': r':32B:([A-Z]{3})\s*([\d,.\s]+)', 'issuing_bank': lambda txt: self._extract_from_swift(txt, ':52A:'), 'advising_bank': lambda txt: self._extract_from_swift(txt, ':53A:'), 'partial_shipment': r':43P:(.+)', 'transhipment': r':43T:(.+)', 'port_of_loading': lambda txt: self._extract_from_swift(txt, ':53A:'), 'port_of_discharge': lambda txt: self._extract_from_swift(txt, ':53A:'), 'latest_shipment_date': r':44C:(\d{6})', 'presentation_days': r':48:(\d+)', } for field, pattern in patterns.items(): if callable(pattern): # appelle le callable avec le texte swift try: value = pattern(swift_text) except Exception: value = None # n'ajouter que si on a une valeur non vide if value: data[field] = value.strip() if isinstance(value, str) else value else: match = re.search(pattern, swift_text, re.MULTILINE | re.IGNORECASE) if match: if field == 'amount': # group(1) = currency, group(2) = montant currency = match.group(1) amount_str = match.group(2) # normaliser le montant: enlever espaces et remplacer la virgule décimale par point cleaned = amount_str.replace(' ', '').replace(',', '.') try: data['amount'] = float(cleaned) data['currency'] = self._find_currency(currency) except ValueError: # si parse échoue, stocker en string pour debug data['amount_raw'] = amount_str elif field in ['issue_date', 'expiry_date', 'latest_shipment_date']: date_str = match.group(1) if len(date_str) == 6: # SWIFT YYMMDD -> interpréter comme 20YY (ou adapter si besoin) year = int(date_str[0:2]) + 2000 month = int(date_str[2:4]) day = int(date_str[4:6]) try: data[field] = datetime(year, month, day).date() except ValueError: # date invalide : ne rien faire ou logger pass else: # ici on sait qu'il y a un groupe capturant (1) data[field] = match.group(1).strip() return data def _extract_from_swift(self, swift_text, field_tag): """Extrait les informations de partie depuis SWIFT""" pattern = field_tag + r'\s*(.+)' match = re.search(pattern, swift_text, re.MULTILINE | re.IGNORECASE) if match: party_name = match.group(1).strip().split(',')[0].strip() return self._find_record_from_text(party_name) return None def _extract_name(self, text): if not text: return None # On prend avant la première virgule return text.split(',')[0].strip() def _find_record_from_text(self, record_text): Party = Pool().get('party.party') Location = Pool().get('stock.location') if not record_text: return None # Extraire le nom name = self._extract_name(record_text) if not name: return None # 1) Exact match parties = Party.search([ ('name', '=', name), ]) if parties: return parties[0] # 2) Match insensible aux accents et majuscules parties = Party.search([ ('name', 'ilike', name), ]) if parties: return parties[0] # 3) Match partiel (ex: "SAFTCO" dans "SAFTCO SA") keyword = name.split()[0] # Premier mot du nom parties = Party.search([ ('name', 'ilike', f'%{keyword}%'), ]) if parties: return parties[0] locations = Location.search([ ('name', '=', name), ]) if locations: return locations[0] # Aucun match trouvé return None def _find_currency(self, currency_code): """Trouve la devise par son code""" Currency = Pool().get('currency.currency') currencies = Currency.search([('code', '=', currency_code)], limit=1) return currencies[0].id if currencies else None def analyze_conditions(self): """Analyse automatique des conditions de la LC""" analysis = [] risks = [] lc = self # Vérification dates if lc.expiry_date and lc.expiry_date < datetime.now().date(): risks.append("LC expirée") if lc.latest_shipment_date and lc.latest_shipment_date < datetime.now().date(): risks.append("Date de shipment dépassée") # Vérification documents if not lc.required_documents: risks.append("Aucun document spécifié") # Vérification banques if not lc.issuing_bank: risks.append("Banque émettrice non spécifiée") # Détermination niveau de risque if len(risks) == 0: lc.risk_level = 'low' elif len(risks) <= 2: lc.risk_level = 'medium' else: lc.risk_level = 'high' lc.conditions_analysis = "\n".join(analysis + ["RISKS:"] + risks) lc.save() return lc.risk_level def prepare_documents_checklist(self): """Prépare la checklist des documents à produire""" lc = self LCDocumentPrepared = Pool().get('lc.document.prepared') # Crée les entrées pour chaque document requis documents_to_prepare = [] if lc.required_documents: for doc_type in lc.required_documents: documents_to_prepare.append({ 'lc': lc.id, 'document_type': doc_type.id, 'required': True, 'status': 'pending' }) if documents_to_prepare: LCDocumentPrepared.create(documents_to_prepare) # lc.state = 'under_review' lc.save() return len(documents_to_prepare) class ImportSwiftStart(ModelView): 'Import SWIFT Start' __name__ = 'lc.import_swift.start' file_ = fields.Binary('SWIFT MT700 file', required=True) filename = fields.Char('Filename') class ImportSwift(Wizard): 'Import SWIFT' __name__ = 'lc.import_swift' start = StateView( 'lc.import_swift.start', 'purchase_trade.import_swift_start_view_form', [ Button('Cancel', 'end', 'tryton-cancel'), Button('Import', 'import_', 'tryton-ok', default=True), ] ) import_ = StateTransition() def transition_import_(self): LC = Pool().get('lc.letter.incoming') active_id = Transaction().context.get('active_id') lc = LC(active_id) content = self.start.file_ text = content.decode('utf-8') extracted = lc._parse_swift_mt700(text) update_vals = {} for field, value in extracted.items(): if hasattr(lc, field) and value: update_vals[field] = value # Met à jour tous les champs via write() if update_vals: LC.write([lc], update_vals) # Passe l'état via write() LC.write([lc], {'state': 'submitted'}) return 'end' def end(self): # Récupérer active_id du contexte ctx = Transaction().context active_id = ctx.get('active_id') or (ctx.get('active_ids') and ctx.get('active_ids')[0]) if not active_id: return 'reload' return { 'type': 'ir.actions.act_window', 'res_model': 'lc.letter.incoming', 'res_id': active_id, 'view_mode': 'form', 'target': 'new', } class AnalyzeConditions(Wizard): "Analyze Conditions" __name__ = "lc.analyze.conditions" start = StateTransition() def transition_start(self): self.records[0].analyze_conditions() return 'end' def end(self): return 'reload' class PrepareDocuments(Wizard): "Prepare Documents" __name__ = "lc.prepare.doc" start = StateTransition() def transition_start(self): self.records[0].prepare_documents_checklist() return 'end' def end(self): return 'reload'