# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. from trytond.model import fields from trytond.pool import Pool, PoolMeta from trytond.pyson import Bool, Eval, Id from trytond.model import (ModelSQL, ModelView) from trytond.exceptions import UserWarning, UserError from trytond.transaction import Transaction, inactive_records from trytond.wizard import Button, StateTransition, StateView, Wizard, StateAction from decimal import getcontext, Decimal, ROUND_HALF_UP from sql.aggregate import Count, Max, Min, Sum, Avg, BoolOr from sql import Column, Literal from sql.functions import CurrentTimestamp, DateTrunc import datetime import logging import json logger = logging.getLogger(__name__) class Lot(ModelSQL, ModelView): "Lot" __name__ = 'lot.lot' _rec_name = 'lot_name' lot_name = fields.Char("Lot") number = fields.Char("Number", readonly=True) lot_qt = fields.Float("Quantity",required=False) lot_unit = fields.Many2One('product.uom', "Unit",required=False) lot_product = fields.Many2One('product.product', "Product") lot_type = fields.Selection([ ('virtual', 'Open'), ('physic', 'Physic'), ('loss', 'Loss'), ], 'Qt type') lot_status = fields.Selection([ ('forecast', 'Forecast'), ('loading', 'Loading'), ('transit', 'Transit'), ('destination', 'Destination'), ('stock', 'Stock'), ('delivered', 'Delivered') ], 'Where') lot_av = fields.Selection([ ('available', 'Available'), ('reserved', 'Reserved'), ('locked', 'Locked'), ('prov', 'Prov. inv'), ('invoiced', 'Invoiced') ], 'State') lot_quantity = fields.Function(fields.Numeric("Net weight",digits='line_unit'),'get_current_quantity') lot_premium = fields.Numeric("Premium", digits='line_unit') lot_premium_sup = fields.Numeric("Sup prem", digits='line_unit') lot_premium_tpl = fields.Numeric("Tpl prem", digits='line_unit') lot_premium_sale = fields.Numeric("Prem/Disc", digits='line_unit') lot_shipment_in = fields.Many2One('stock.shipment.in', "Shipment In") lot_shipment_out = fields.Many2One('stock.shipment.out', "Shipment Out") lot_shipment_internal = fields.Many2One('stock.shipment.internal', "Shipment Internal") lot_gross_quantity = fields.Function(fields.Numeric("Gross weight",digits='line_unit'),'get_current_gross_quantity') lot_parent = fields.Many2One('lot.lot',"Parent") lot_childs = fields.One2Many('lot.lot','lot_parent',"Childs") lot_himself = fields.Many2One('lot.lot',"Lot") lot_container = fields.Char("Container") lot_unit_line = fields.Many2One('product.uom', "Unit",required=True) lot_price = fields.Function(fields.Numeric("Price", digits='line_unit'),'get_lot_price') lot_price_sale = fields.Function(fields.Numeric("Price", digits='line_unit'),'get_lot_sale_price') lot_state = fields.Many2One('lot.qt.type',"Qt state") lot_hist = fields.One2Many('lot.qt.hist','lot',"Qt hist") lot_pur_inv_state = fields.Selection([ (None, ''), ('prov', 'Prov'), ('invoiced', 'Final') ], 'Pur. inv') lot_sale_inv_state = fields.Selection([ (None, ''), ('prov', 'Prov'), ('invoiced', 'Final') ], 'Sale inv') lot_price_ct_symbol = fields.Function(fields.Char(""),'get_price_ct_symbol') lot_price_ct_symbol_sale = fields.Function(fields.Char(""),'get_price_ct_symbol_sale') lot_price_ct_symbol_premium = fields.Function(fields.Char(""),'get_price_ct_symbol_premium') line_unit = fields.Function(fields.Many2One('product.uom',"Line unit"),'get_unit_line') lot_shipment_origin = fields.Function( fields.Reference( selection=[ ("stock.shipment.in", "In"), ("stock.shipment.out", "Out"), ("stock.shipment.internal", "Internal"), ], string="Shipment", ), "get_shipment_origin", ) lot_role = fields.Selection([ ('normal', 'Normal'), ('technical', 'Technical'), ], 'Role', required=True) split_operations = fields.One2Many( 'lot.split.merge', 'source_lot', 'Split/Merge Ops' ) split_graph = fields.Function( fields.Text('Split Graph'), 'get_split_graph' ) @classmethod def default_lot_role(cls): return 'normal' # --------------------------------------------------------------------- # Technical helpers # --------------------------------------------------------------------- def _check_split_allowed(self): if self.lot_role != 'normal': raise UserError('Only normal lots can be split.') if self.lot_type != 'physic': raise UserError('Only physical lots can be split.') if self.IsDelivered(): raise UserError('Delivered lots cannot be split.') def _create_technical_parent(self): Lot = Pool().get('lot.lot') parent = Lot( lot_name=f'TECH-{self.lot_name}', lot_role='technical', lot_type='virtual', lot_product=self.lot_product, lot_status=self.lot_status, lot_av='locked', ) Lot.save([parent]) return parent def _clone_hist_with_ratio(self, ratio): LotQtHist = Pool().get('lot.qt.hist') hist = [] for h in self.lot_hist: hist.append( LotQtHist( quantity_type=h.quantity_type, quantity=h.quantity * ratio, gross_quantity=h.gross_quantity * ratio, ) ) return hist def split_by_weight(self, splits): Date = Pool().get('ir.date') self._check_split_allowed() total_weight = sum(s['quantity'] for s in splits) lot_weight = self.get_current_quantity() diff = (lot_weight - total_weight).quantize(Decimal('0.00001')) if diff < 0: raise UserError('Split weight exceeds lot weight.') if diff != 0: raise UserError( f'Split does not fully consume the lot weight.\n' f'Remaining: {diff}' ) Lot = Pool().get('lot.lot') Split = Pool().get('lot.split.merge') children = [] # Parent becomes technical self.lot_av = 'locked' self.lot_role = 'technical' self.lot_name = 'TECH-' + self.lot_name line = self.line self.line = None Lot.save([self]) for s in splits: ratio = s['quantity'] / lot_weight lot_qt = None if self.lot_qt: lot_qt = (Decimal(self.lot_qt) * ratio).quantize( Decimal('0.00001'), rounding=ROUND_HALF_UP ) child = Lot( lot_name=s.get('name'), lot_role='normal', lot_type='physic', lot_qt=float(lot_qt) if lot_qt is not None else None, lot_unit=self.lot_unit, lot_product=self.lot_product, lot_status=self.lot_status, lot_av=self.lot_av, lot_unit_line=self.lot_unit_line, ) with Transaction().set_context(_skip_function_fields=True): Lot.save([child]) child.set_current_quantity(s['quantity'], s['quantity'], 1) child.lot_parent = self child.line = line Lot.save([child]) children.append(child) ops = [] for c in children: ops.append( Split( operation='split', source_lot=self, target_lot=c, lot_qt=c.lot_qt, quantity=c.get_current_quantity(), operation_date=Date.today(), ) ) Split.save(ops) return children @classmethod def merge_lots(cls, lots): Date = Pool().get('ir.date') if len(lots) < 2: raise UserError('At least two lots are required for merge.') # parent = lots[0].lot_parent # if not parent or parent.lot_role != 'technical': # raise UserError('Lots must share a technical parent.') product = lots[0].lot_product for l in lots: if l.lot_product != product: raise UserError('Cannot merge lots of different products.') Lot = Pool().get('lot.lot') Split = Pool().get('lot.split.merge') total_qt = sum([l.lot_qt or 0 for l in lots]) total_weight = sum([l.get_current_quantity() for l in lots]) ops = [] parents_name = "" for l in lots: l.lot_av = 'locked' l.lot_role = 'technical' l.line = None parents_name += l.lot_name + "/" ops.append( Split( operation='merge', source_lot=l, target_lot=merged, lot_qt=l.lot_qt, quantity=l.get_current_quantity(), operation_date=Date.today(), ) ) Lot.save(lots) Split.save(ops) merged = Lot( lot_name='MERGE-' + parents_name, lot_role='normal', lot_type='physic', lot_product=product, lot_qt=total_qt, lot_unit=lots[0].lot_unit, lot_unit_line=lots[0].lot_unit_line, ) merged.set_current_quantity(total_weight, total_weight, 1) Lot.save([merged]) return merged def get_split_graph(self, name=None): Split = Pool().get('lot.split.merge') nodes = {} edges = [] def add_lot_node(lot): if lot.id in nodes: return nodes[lot.id] = { 'id': f'lot.lot {lot.id}', 'label': f'{lot.lot_name}\n({lot.lot_role})', 'data_model': 'lot.lot', 'data_id': lot.id, 'shape': 'box' if lot.lot_role == 'technical' else 'ellipse', 'color': { 'background': '#FFD966' if lot.lot_role == 'technical' else '#A7C7E7', 'border': '#555', }, } def walk(lot): add_lot_node(lot) ops = Split.search([('source_lot', '=', lot.id)]) for op in ops: tgt = op.target_lot add_lot_node(tgt) edges.append({ 'from': f'lot.lot {lot.id}', 'to': f'lot.lot {tgt.id}', 'label': op.operation if op.operation else '', 'arrows': 'to', 'color': { 'color': '#267F82', 'highlight': '#1D5F62', 'hover': '#1D5F62' } }) walk(tgt) walk(self) # Calcul des relations parent-enfant from collections import defaultdict children_map = defaultdict(list) parent_map = {} for edge in edges: from_id = edge['from'] to_id = edge['to'] children_map[from_id].append(to_id) parent_map[to_id] = from_id # Trouver la racine (nœud sans parent) all_node_ids = {node['id'] for node in nodes.values()} root_candidates = all_node_ids - set(parent_map.keys()) if root_candidates: root_id = list(root_candidates)[0] else: root_id = f'lot.lot {self.id}' # Assigner les niveaux hiérarchiques levels = {} def assign_levels(node_id, level=0): if node_id in levels and levels[node_id] >= level: return levels[node_id] = level for child in children_map.get(node_id, []): assign_levels(child, level + 1) assign_levels(root_id) # Grouper les nœuds par niveau by_level = defaultdict(list) for node_id, level in levels.items(): by_level[level].append(node_id) # Calculer les positions - IMPORTANT: utiliser des valeurs positives plus grandes # Calculer les positions pour un graphe HORIZONTAL X_SPACING = 180 # espace entre les niveaux (gauche → droite) Y_SPACING = 100 # espace entre les nœuds d’un même niveau max_nodes_in_level = max(len(nodes) for nodes in by_level.values()) if by_level else 1 total_height = max_nodes_in_level * Y_SPACING for level, node_ids in by_level.items(): level_height = len(node_ids) * Y_SPACING start_y = (total_height - level_height) / 2 # Centrer verticalement for i, node_id in enumerate(node_ids): node_num = int(node_id.split()[-1]) # Niveau → X x_pos = level * X_SPACING + 300 # Répartition verticale y_pos = start_y + (i * Y_SPACING) + 100 nodes[node_num]['x'] = x_pos nodes[node_num]['y'] = y_pos nodes[node_num]['fixed'] = { 'x': True, 'y': True } # S'assurer que tous les nœuds ont des positions for node in nodes.values(): if 'x' not in node: node['x'] = 0 node['y'] = 0 node['fixed'] = {'x': True, 'y': True} data = { 'nodes': list(nodes.values()), 'edges': edges, 'physics': { 'enabled': False # Désactivé car positions fixes } } return 'visjson:' + json.dumps(data) def get_shipment_origin(self, name): if self.lot_shipment_in: return 'stock.shipment.in,' + str(self.lot_shipment_in.id) elif self.lot_shipment_out: return 'stock.shipment.out,' + str(self.lot_shipment_out.id) elif self.lot_shipment_internal: return 'stock.shipment.internal,' + str(self.lot_shipment_internal.id) return None def get_unit_line(self,name): if self.line: return self.line.unit return self.lot_unit_line def get_lot_inv(self): if self.invoice_line: return self.invoice_line.invoice.origin if self.invoice_line_prov: return self.invoice_line_prov.invoice.origin @classmethod def __setup__(cls): super(Lot, cls).__setup__() #cls._order.insert(0, ('lot_shipment', 'ASC')) @classmethod def default_lot_qt(cls): return Decimal(0) @classmethod def default_lot_av(cls): return 'available' @classmethod def default_lot_status(cls): return 'forecast' @classmethod def default_lot_type(cls): return 'physic' @classmethod def default_lot_gross_quantity(cls): return Decimal(0) @classmethod def default_lot_type(cls): return 'receive' @classmethod def default_lot_state(cls): LotQtType = Pool().get('lot.qt.type') lqt = LotQtType.search([('sequence','=',1)]) if lqt: return lqt[0].id def get_price_ct_symbol(self,name): if self.line: return str(self.line.currency.symbol) + "/" + str(self.line.unit.symbol) def get_price_ct_symbol_sale(self,name): if self.sale_line: return str(self.sale_line.currency.symbol) + "/" + str(self.sale_line.unit.symbol) def get_price_ct_symbol_premium(self,name): if self.line: if self.line.enable_linked_currency: return str(self.line.linked_currency.name) + "/" + str(self.line.linked_unit.symbol) else: return str(self.line.currency.symbol) + "/" + str(self.line.unit.symbol) def get_hist_quantity(self,state_id=0): qt = Decimal(0) gross_qt = Decimal(0) if self.lot_state: if self.lot_hist: if state_id != 0: st = state_id else: st = self.lot_state.id logger.info("GET_HIST_QT:%s",st) lot = [e for e in self.lot_hist if e.quantity_type.id == st][0] qt = round(lot.quantity,5) gross_qt = round(lot.gross_quantity,5) return qt, gross_qt def get_virtual_diff(self): Uom = Pool().get('product.uom') line = self.line if self.line else self.sale_line if line: if line.lots: physic_sum = Decimal(0) for l in line.lots: if l.lot_type == 'physic' : factor = None rate = None if l.lot_unit_line.category.id != l.line.unit.category.id: factor = 1 rate = 1 physic_sum += round(Decimal(Uom.compute_qty(Uom(l.lot_unit_line),float(l.get_current_quantity()),l.line.unit, True, factor, rate)),5) return line.quantity_theorical - physic_sum def get_current_quantity(self,name=None): # if self.lot_type == 'physic': qt, gross_qt = self.get_hist_quantity() return qt # else: # return self.get_virtual_diff() def get_current_quantity_converted(self,state_id=0,unit=None): Uom = Pool().get('product.uom') if not unit: unit = self.line.unit if self.line else self.sale_line.unit qt, gross_qt = self.get_hist_quantity(state_id) factor = None rate = None if self.lot_unit_line.category.id != unit.category.id: factor = 1 rate = 1 return round(Decimal(Uom.compute_qty(self.lot_unit_line, float(qt), unit, True, factor, rate)),5) def get_current_gross_quantity_converted(self,state_id=0,unit=None): Uom = Pool().get('product.uom') if not unit: unit = self.line.unit if self.line else self.sale_line.unit qt, gross_qt = self.get_hist_quantity(state_id) factor = None rate = None if self.lot_unit_line.category.id != unit.category.id: factor = 1 rate = 1 return round(Decimal(Uom.compute_qty(self.lot_unit_line, float(gross_qt), unit, True, factor, rate)),5) def get_current_gross_quantity(self,name=None): if self.lot_type == 'physic': qt, gross_qt = self.get_hist_quantity() return gross_qt else: return None def add_quantity_to_hist(self,net,gross,qt_type): LotQtHist = Pool().get('lot.qt.hist') lqh = LotQtHist() lqh.quantity_type = qt_type lqh.quantity = net logger.info("ADD_QUANTITY_TO_HIST:%s",gross) lqh.gross_quantity = gross lqh.lot = self return lqh def set_current_quantity(self, net, gross, seq=1): LotQtType = Pool().get('lot.qt.type') lqtt = LotQtType.search([('sequence', '=', seq)]) if not lqtt: return lot_hist = list(getattr(self, 'lot_hist', []) or []) existing = [e for e in lot_hist if e.quantity_type == lqtt[0]] if existing: hist = existing[0] hist.quantity = net logger.info("SET_CURRENT_HIST:%s",gross) hist.gross_quantity = gross else: lot_hist.append(self.add_quantity_to_hist(net, gross, lqtt[0])) self.lot_hist = lot_hist self.lot_state = lqtt[0] def get_unit(self,name): if self.line: return self.line.unit if self.sale_line: return self.sale_line.unit def get_lot_price(self,name=None): price = Decimal(0) if self.line: if self.line.enable_linked_currency and self.line.linked_price and self.line.linked_currency and self.line.price_type == 'priced': return self.line.get_price_linked_currency((self.lot_premium if self.lot_premium else 0)) else: return self.line.get_price((self.lot_premium if self.lot_premium else 0)) def get_lot_sale_price(self,name=None): price = Decimal(0) if self.sale_line: # if self.line.enable_linked_currency and self.line.linked_price and self.line.linked_currency and self.line.price_type == 'priced': # return self.line.get_price_linked_currency((self.lot_premium if self.lot_premium else 0)) # else: return self.sale_line.get_price((self.lot_premium if self.lot_premium else 0)) def get_sale_amount(self,name): round_context = getcontext() round_context.rounding = ROUND_HALF_UP price = self.get_lot_sale_price() return None def get_amount(self,name): round_context = getcontext() round_context.rounding = ROUND_HALF_UP price = self.get_lot_price() return None class QtType(ModelSQL, ModelView): "Type" __name__ = 'lot.qt.type' name = fields.Char("Name") sequence = fields.Integer("Sequence") class QtHist(ModelSQL, ModelView): "Quantities" __name__ = 'lot.qt.hist' lot = fields.Many2One( 'lot.lot', "Lot", ondelete='CASCADE', ) quantity_type = fields.Many2One('lot.qt.type',"Type") quantity = fields.Numeric("Net weight",digits=(1,5)) gross_quantity = fields.Numeric("Gross weight",digits=(1,5)) class LotSplitMerge(ModelSQL, ModelView): "Lot Split Merge" __name__ = 'lot.split.merge' operation = fields.Selection([ ('split', 'Split'), ('merge', 'Merge'), ], 'Operation', required=True) source_lot = fields.Many2One( 'lot.lot', 'Source Lot', required=True, ondelete='CASCADE' ) target_lot = fields.Many2One( 'lot.lot', 'Target Lot', required=True, ondelete='CASCADE' ) lot_qt = fields.Float('Elements count') quantity = fields.Numeric('Weight', digits=(16, 5)) operation_date = fields.Date('Date', required=True) reversed_by = fields.Many2One( 'lot.split.merge', 'Reversed By' ) class SplitLine(ModelView): "Split Line" __name__ = 'lot.split.wizard.line' name = fields.Char('Lot name') lot_qt = fields.Float('Quantity') weight = fields.Numeric('Weight', digits=(16,5)) class SplitWizardStart(ModelView): "Split Line Start" __name__ = 'lot.split.wizard.start' mode = fields.Selection([ ('equal', 'Equal parts'), ('manual', 'Manual'), ], 'Mode', required=True) parts = fields.Integer( 'Parts', states={'required': Eval('mode') == 'equal'} ) create_remainder = fields.Boolean( 'Create remainder lot', help='If checked, a remainder lot will be created when the split ' 'does not exactly match the original weight.' ) lines = fields.One2Many( 'lot.split.wizard.line', None, 'Lines', states={'invisible': Eval('mode') != 'manual'} ) class SplitWizard(Wizard): "Lot Split Wizard" __name__ = 'lot.split.wizard' start = StateView( 'lot.split.wizard.start', 'lot.split_wizard_start_view', [ Button('Cancel', 'end', 'tryton-cancel'), Button('Split', 'split', 'tryton-ok', default=True), ] ) split = StateTransition() def transition_split(self): Lot = Pool().get('lot.lot') lot = Lot(Transaction().context['active_id']) if self.start.mode == 'equal': part = lot.get_current_quantity() / self.start.parts splits = [ {'quantity': part, 'name': f'{lot.lot_name}-{i+1}'} for i in range(self.start.parts) ] else: splits = [ {'quantity': l.weight, 'lot_qt': l.lot_qt, 'name': l.name} for l in self.start.lines ] lot.split_by_weight(splits) return 'end' class SplitWizard(Wizard): __name__ = 'lot.split.wizard' start = StateView( 'lot.split.wizard.start', 'lot.split_wizard_start_view', [ Button('Cancel', 'end', 'tryton-cancel'), Button('Split', 'split', 'tryton-ok', default=True), ] ) split = StateTransition() def transition_split(self): Lot = Pool().get('lot.lot') lot = Lot(Transaction().context['active_id']) total_weight = lot.get_current_quantity().quantize( Decimal('0.00001'), rounding=ROUND_HALF_UP ) splits = [] allocated = Decimal('0.0') # ------------------------------------------------- # EQUAL MODE # ------------------------------------------------- if self.start.mode == 'equal': if not self.start.parts or self.start.parts <= 0: raise UserError('Number of parts must be greater than zero.') base = (total_weight / self.start.parts).quantize( Decimal('0.00001'), rounding=ROUND_HALF_UP ) for i in range(self.start.parts): qty = base if i == self.start.parts - 1: qty = total_weight - allocated # absorbe le reste splits.append({ 'quantity': qty, 'lot_qt': lot.lot_qt / self.start.parts if lot.lot_qt else None, 'name': f'{lot.lot_name}-{i + 1}', }) allocated += qty # ------------------------------------------------- # MANUAL MODE # ------------------------------------------------- else: for line in self.start.lines: if not line.weight or line.weight <= 0: raise UserError('Each line must have a positive weight.') splits.append({ 'quantity': line.weight, 'lot_qt': line.lot_qt, 'name': line.name, }) allocated += line.weight diff = (total_weight - allocated).quantize( Decimal('0.00001'), rounding=ROUND_HALF_UP ) if diff != 0: if self.start.create_remainder and diff > 0: splits.append({ 'quantity': diff, 'lot_qt': None, 'name': f'{lot.lot_name}-REMAINDER', }) else: raise UserError( f'Split does not match the lot weight.\n' f'Remaining difference: {diff}' ) lot.split_by_weight(splits) return 'end' class MergeLotsStart(ModelView): "Merge Lots Start" __name__ = 'lot.merge.wizard.start' confirm = fields.Boolean('Confirm merge') class MergeLots(Wizard): "Merge Lots" __name__ = 'lot.merge.wizard' start = StateView( 'lot.merge.wizard.start', 'lot.lot_merge_wizard_start_view', [ Button('Cancel', 'end', 'tryton-cancel'), Button('Merge', 'merge', 'tryton-ok', default=True), ] ) merge = StateTransition() def transition_merge(self): Lot = Pool().get('lot.lot') ids = Transaction().context['active_ids'] lots = Lot.browse(ids) Lot.merge_lots(lots) return 'end'