Files
tradon/modules/lot/lot.py
2026-03-16 10:48:54 +01:00

834 lines
28 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from trytond.model import fields
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Bool, Eval, Id
from trytond.model import (ModelSQL, ModelView)
from trytond.exceptions import UserWarning, UserError
from trytond.transaction import Transaction, inactive_records
from trytond.wizard import Button, StateTransition, StateView, Wizard, StateAction
from decimal import getcontext, Decimal, ROUND_HALF_UP
from sql.aggregate import Count, Max, Min, Sum, Avg, BoolOr
from sql import Column, Literal
from sql.functions import CurrentTimestamp, DateTrunc
import datetime
import logging
import json
logger = logging.getLogger(__name__)
class Lot(ModelSQL, ModelView):
"Lot"
__name__ = 'lot.lot'
_rec_name = 'lot_name'
lot_name = fields.Char("Lot")
number = fields.Char("Number", readonly=True)
lot_qt = fields.Float("Quantity",required=False)
lot_unit = fields.Many2One('product.uom', "Unit",required=False)
lot_product = fields.Many2One('product.product', "Product")
lot_type = fields.Selection([
('virtual', 'Open'),
('physic', 'Physic'),
('loss', 'Loss'),
], 'Qt type')
lot_status = fields.Selection([
('forecast', 'Forecast'),
('loading', 'Loading'),
('transit', 'Transit'),
('destination', 'Destination'),
('stock', 'Stock'),
('delivered', 'Delivered')
], 'Where')
lot_av = fields.Selection([
('available', 'Available'),
('reserved', 'Reserved'),
('locked', 'Locked'),
('prov', 'Prov. inv'),
('invoiced', 'Invoiced')
], 'State')
lot_quantity = fields.Function(fields.Numeric("Net weight",digits='line_unit'),'get_current_quantity')
lot_premium = fields.Numeric("Premium", digits='line_unit')
lot_premium_sup = fields.Numeric("Sup prem", digits='line_unit')
lot_premium_tpl = fields.Numeric("Tpl prem", digits='line_unit')
lot_premium_sale = fields.Numeric("Prem/Disc", digits='line_unit')
lot_shipment_in = fields.Many2One('stock.shipment.in', "Shipment In")
lot_shipment_out = fields.Many2One('stock.shipment.out', "Shipment Out")
lot_shipment_internal = fields.Many2One('stock.shipment.internal', "Shipment Internal")
lot_gross_quantity = fields.Function(fields.Numeric("Gross weight",digits='line_unit'),'get_current_gross_quantity')
lot_parent = fields.Many2One('lot.lot',"Parent")
lot_childs = fields.One2Many('lot.lot','lot_parent',"Childs")
lot_himself = fields.Many2One('lot.lot',"Lot")
lot_container = fields.Char("Container")
lot_unit_line = fields.Many2One('product.uom', "Unit",required=True)
lot_price = fields.Function(fields.Numeric("Price", digits='line_unit'),'get_lot_price')
lot_price_sale = fields.Function(fields.Numeric("Price", digits='line_unit'),'get_lot_sale_price')
lot_state = fields.Many2One('lot.qt.type',"Qt state")
lot_hist = fields.One2Many('lot.qt.hist','lot',"Qt hist")
lot_pur_inv_state = fields.Selection([
(None, ''),
('prov', 'Prov'),
('invoiced', 'Final')
], 'Pur. inv')
lot_sale_inv_state = fields.Selection([
(None, ''),
('prov', 'Prov'),
('invoiced', 'Final')
], 'Sale inv')
lot_price_ct_symbol = fields.Function(fields.Char(""),'get_price_ct_symbol')
lot_price_ct_symbol_sale = fields.Function(fields.Char(""),'get_price_ct_symbol_sale')
lot_price_ct_symbol_premium = fields.Function(fields.Char(""),'get_price_ct_symbol_premium')
line_unit = fields.Function(fields.Many2One('product.uom',"Line unit"),'get_unit_line')
lot_shipment_origin = fields.Function(
fields.Reference(
selection=[
("stock.shipment.in", "In"),
("stock.shipment.out", "Out"),
("stock.shipment.internal", "Internal"),
],
string="Shipment",
),
"get_shipment_origin",
)
lot_role = fields.Selection([
('normal', 'Normal'),
('technical', 'Technical'),
], 'Role', required=True)
split_operations = fields.One2Many(
'lot.split.merge', 'source_lot', 'Split/Merge Ops'
)
split_graph = fields.Function(
fields.Text('Split Graph'),
'get_split_graph'
)
@classmethod
def default_lot_role(cls):
return 'normal'
# ---------------------------------------------------------------------
# Technical helpers
# ---------------------------------------------------------------------
def _check_split_allowed(self):
if self.lot_role != 'normal':
raise UserError('Only normal lots can be split.')
if self.lot_type != 'physic':
raise UserError('Only physical lots can be split.')
if self.IsDelivered():
raise UserError('Delivered lots cannot be split.')
def _create_technical_parent(self):
Lot = Pool().get('lot.lot')
parent = Lot(
lot_name=f'TECH-{self.lot_name}',
lot_role='technical',
lot_type='virtual',
lot_product=self.lot_product,
lot_status=self.lot_status,
lot_av='locked',
)
Lot.save([parent])
return parent
def _clone_hist_with_ratio(self, ratio):
LotQtHist = Pool().get('lot.qt.hist')
hist = []
for h in self.lot_hist:
hist.append(
LotQtHist(
quantity_type=h.quantity_type,
quantity=h.quantity * ratio,
gross_quantity=h.gross_quantity * ratio,
)
)
return hist
def split_by_weight(self, splits):
Date = Pool().get('ir.date')
self._check_split_allowed()
total_weight = sum(s['quantity'] for s in splits)
lot_weight = self.get_current_quantity()
diff = (lot_weight - total_weight).quantize(Decimal('0.00001'))
if diff < 0:
raise UserError('Split weight exceeds lot weight.')
if diff != 0:
raise UserError(
f'Split does not fully consume the lot weight.\n'
f'Remaining: {diff}'
)
Lot = Pool().get('lot.lot')
Split = Pool().get('lot.split.merge')
children = []
# Parent becomes technical
self.lot_av = 'locked'
self.lot_role = 'technical'
self.lot_name = 'TECH-' + self.lot_name
line = self.line
self.line = None
Lot.save([self])
for s in splits:
ratio = s['quantity'] / lot_weight
lot_qt = None
if self.lot_qt:
lot_qt = (Decimal(self.lot_qt) * ratio).quantize(
Decimal('0.00001'), rounding=ROUND_HALF_UP
)
child = Lot(
lot_name=s.get('name'),
lot_role='normal',
lot_type='physic',
lot_qt=float(lot_qt) if lot_qt is not None else None,
lot_unit=self.lot_unit,
lot_product=self.lot_product,
lot_status=self.lot_status,
lot_av=self.lot_av,
lot_unit_line=self.lot_unit_line,
)
with Transaction().set_context(_skip_function_fields=True):
Lot.save([child])
child.set_current_quantity(s['quantity'], s['quantity'], 1)
child.lot_parent = self
child.line = line
Lot.save([child])
children.append(child)
ops = []
for c in children:
ops.append(
Split(
operation='split',
source_lot=self,
target_lot=c,
lot_qt=c.lot_qt,
quantity=c.get_current_quantity(),
operation_date=Date.today(),
)
)
Split.save(ops)
return children
@classmethod
def merge_lots(cls, lots):
Date = Pool().get('ir.date')
if len(lots) < 2:
raise UserError('At least two lots are required for merge.')
# parent = lots[0].lot_parent
# if not parent or parent.lot_role != 'technical':
# raise UserError('Lots must share a technical parent.')
product = lots[0].lot_product
for l in lots:
if l.lot_product != product:
raise UserError('Cannot merge lots of different products.')
Lot = Pool().get('lot.lot')
Split = Pool().get('lot.split.merge')
total_qt = sum([l.lot_qt or 0 for l in lots])
total_weight = sum([l.get_current_quantity() for l in lots])
ops = []
parents_name = ""
for l in lots:
l.lot_av = 'locked'
l.lot_role = 'technical'
l.line = None
parents_name += l.lot_name + "/"
ops.append(
Split(
operation='merge',
source_lot=l,
target_lot=merged,
lot_qt=l.lot_qt,
quantity=l.get_current_quantity(),
operation_date=Date.today(),
)
)
Lot.save(lots)
Split.save(ops)
merged = Lot(
lot_name='MERGE-' + parents_name,
lot_role='normal',
lot_type='physic',
lot_product=product,
lot_qt=total_qt,
lot_unit=lots[0].lot_unit,
lot_unit_line=lots[0].lot_unit_line,
)
merged.set_current_quantity(total_weight, total_weight, 1)
Lot.save([merged])
return merged
def get_split_graph(self, name=None):
Split = Pool().get('lot.split.merge')
nodes = {}
edges = []
def add_lot_node(lot):
if lot.id in nodes:
return
nodes[lot.id] = {
'id': f'lot.lot {lot.id}',
'label': f'{lot.lot_name}\n({lot.lot_role})',
'data_model': 'lot.lot',
'data_id': lot.id,
'shape': 'box' if lot.lot_role == 'technical' else 'ellipse',
'color': {
'background': '#FFD966' if lot.lot_role == 'technical' else '#A7C7E7',
'border': '#555',
},
}
def walk(lot):
add_lot_node(lot)
ops = Split.search([('source_lot', '=', lot.id)])
for op in ops:
tgt = op.target_lot
add_lot_node(tgt)
edges.append({
'from': f'lot.lot {lot.id}',
'to': f'lot.lot {tgt.id}',
'label': op.operation if op.operation else '',
'arrows': 'to',
'color': {
'color': '#267F82',
'highlight': '#1D5F62',
'hover': '#1D5F62'
}
})
walk(tgt)
walk(self)
# Calcul des relations parent-enfant
from collections import defaultdict
children_map = defaultdict(list)
parent_map = {}
for edge in edges:
from_id = edge['from']
to_id = edge['to']
children_map[from_id].append(to_id)
parent_map[to_id] = from_id
# Trouver la racine (nœud sans parent)
all_node_ids = {node['id'] for node in nodes.values()}
root_candidates = all_node_ids - set(parent_map.keys())
if root_candidates:
root_id = list(root_candidates)[0]
else:
root_id = f'lot.lot {self.id}'
# Assigner les niveaux hiérarchiques
levels = {}
def assign_levels(node_id, level=0):
if node_id in levels and levels[node_id] >= level:
return
levels[node_id] = level
for child in children_map.get(node_id, []):
assign_levels(child, level + 1)
assign_levels(root_id)
# Grouper les nœuds par niveau
by_level = defaultdict(list)
for node_id, level in levels.items():
by_level[level].append(node_id)
# Calculer les positions - IMPORTANT: utiliser des valeurs positives plus grandes
# Calculer les positions pour un graphe HORIZONTAL
X_SPACING = 180 # espace entre les niveaux (gauche → droite)
Y_SPACING = 100 # espace entre les nœuds dun même niveau
max_nodes_in_level = max(len(nodes) for nodes in by_level.values()) if by_level else 1
total_height = max_nodes_in_level * Y_SPACING
for level, node_ids in by_level.items():
level_height = len(node_ids) * Y_SPACING
start_y = (total_height - level_height) / 2 # Centrer verticalement
for i, node_id in enumerate(node_ids):
node_num = int(node_id.split()[-1])
# Niveau → X
x_pos = level * X_SPACING + 300
# Répartition verticale
y_pos = start_y + (i * Y_SPACING) + 100
nodes[node_num]['x'] = x_pos
nodes[node_num]['y'] = y_pos
nodes[node_num]['fixed'] = {
'x': True,
'y': True
}
# S'assurer que tous les nœuds ont des positions
for node in nodes.values():
if 'x' not in node:
node['x'] = 0
node['y'] = 0
node['fixed'] = {'x': True, 'y': True}
data = {
'nodes': list(nodes.values()),
'edges': edges,
'physics': {
'enabled': False # Désactivé car positions fixes
}
}
return 'visjson:' + json.dumps(data)
def get_shipment_origin(self, name):
if self.lot_shipment_in:
return 'stock.shipment.in,' + str(self.lot_shipment_in.id)
elif self.lot_shipment_out:
return 'stock.shipment.out,' + str(self.lot_shipment_out.id)
elif self.lot_shipment_internal:
return 'stock.shipment.internal,' + str(self.lot_shipment_internal.id)
return None
def get_unit_line(self,name):
if self.line:
return self.line.unit
return self.lot_unit_line
def get_lot_inv(self):
if self.invoice_line:
return self.invoice_line.invoice.origin
if self.invoice_line_prov:
return self.invoice_line_prov.invoice.origin
@classmethod
def __setup__(cls):
super(Lot, cls).__setup__()
#cls._order.insert(0, ('lot_shipment', 'ASC'))
@classmethod
def default_lot_qt(cls):
return Decimal(0)
@classmethod
def default_lot_av(cls):
return 'available'
@classmethod
def default_lot_status(cls):
return 'forecast'
@classmethod
def default_lot_type(cls):
return 'physic'
@classmethod
def default_lot_gross_quantity(cls):
return Decimal(0)
@classmethod
def default_lot_type(cls):
return 'receive'
@classmethod
def default_lot_state(cls):
LotQtType = Pool().get('lot.qt.type')
lqt = LotQtType.search([('sequence','=',1)])
if lqt:
return lqt[0].id
def get_price_ct_symbol(self,name):
if self.line:
return str(self.line.currency.symbol) + "/" + str(self.line.unit.symbol)
def get_price_ct_symbol_sale(self,name):
if self.sale_line:
return str(self.sale_line.currency.symbol) + "/" + str(self.sale_line.unit.symbol)
def get_price_ct_symbol_premium(self,name):
if self.line:
if self.line.enable_linked_currency:
return str(self.line.linked_currency.name) + "/" + str(self.line.linked_unit.symbol)
else:
return str(self.line.currency.symbol) + "/" + str(self.line.unit.symbol)
def get_hist_quantity(self,state_id=0):
qt = Decimal(0)
gross_qt = Decimal(0)
if self.lot_state:
if self.lot_hist:
if state_id != 0:
st = state_id
else:
st = self.lot_state.id
logger.info("GET_HIST_QT:%s",st)
lot = [e for e in self.lot_hist if e.quantity_type.id == st][0]
qt = round(lot.quantity,5)
gross_qt = round(lot.gross_quantity,5)
return qt, gross_qt
def get_virtual_diff(self):
Uom = Pool().get('product.uom')
line = self.line if self.line else self.sale_line
if line:
if line.lots:
physic_sum = Decimal(0)
for l in line.lots:
if l.lot_type == 'physic' :
factor = None
rate = None
if l.lot_unit_line.category.id != l.line.unit.category.id:
factor = 1
rate = 1
physic_sum += round(Decimal(Uom.compute_qty(Uom(l.lot_unit_line),float(l.get_current_quantity()),l.line.unit, True, factor, rate)),5)
return line.quantity_theorical - physic_sum
def get_current_quantity(self,name=None):
# if self.lot_type == 'physic':
qt, gross_qt = self.get_hist_quantity()
return qt
# else:
# return self.get_virtual_diff()
def get_current_quantity_converted(self,state_id=0,unit=None):
Uom = Pool().get('product.uom')
if not unit:
unit = self.line.unit if self.line else self.sale_line.unit
qt, gross_qt = self.get_hist_quantity(state_id)
factor = None
rate = None
if self.lot_unit_line.category.id != unit.category.id:
factor = 1
rate = 1
return round(Decimal(Uom.compute_qty(self.lot_unit_line, float(qt), unit, True, factor, rate)),5)
def get_current_gross_quantity_converted(self,state_id=0,unit=None):
Uom = Pool().get('product.uom')
if not unit:
unit = self.line.unit if self.line else self.sale_line.unit
qt, gross_qt = self.get_hist_quantity(state_id)
factor = None
rate = None
if self.lot_unit_line.category.id != unit.category.id:
factor = 1
rate = 1
return round(Decimal(Uom.compute_qty(self.lot_unit_line, float(gross_qt), unit, True, factor, rate)),5)
def get_current_gross_quantity(self,name=None):
if self.lot_type == 'physic':
qt, gross_qt = self.get_hist_quantity()
return gross_qt
else:
return None
def add_quantity_to_hist(self,net,gross,qt_type):
LotQtHist = Pool().get('lot.qt.hist')
lqh = LotQtHist()
lqh.quantity_type = qt_type
lqh.quantity = net
logger.info("ADD_QUANTITY_TO_HIST:%s",gross)
lqh.gross_quantity = gross
lqh.lot = self
return lqh
def set_current_quantity(self, net, gross, seq=1):
LotQtType = Pool().get('lot.qt.type')
lqtt = LotQtType.search([('sequence', '=', seq)])
if not lqtt:
return
lot_hist = list(getattr(self, 'lot_hist', []) or [])
existing = [e for e in lot_hist if e.quantity_type == lqtt[0]]
if existing:
hist = existing[0]
hist.quantity = net
logger.info("SET_CURRENT_HIST:%s",gross)
hist.gross_quantity = gross
else:
lot_hist.append(self.add_quantity_to_hist(net, gross, lqtt[0]))
self.lot_hist = lot_hist
self.lot_state = lqtt[0]
def get_unit(self,name):
if self.line:
return self.line.unit
if self.sale_line:
return self.sale_line.unit
def get_lot_price(self,name=None):
price = Decimal(0)
if self.line:
if self.line.enable_linked_currency and self.line.linked_price and self.line.linked_currency and self.line.price_type == 'priced':
return self.line.get_price_linked_currency((self.lot_premium if self.lot_premium else 0))
else:
return self.line.get_price((self.lot_premium if self.lot_premium else 0))
def get_lot_sale_price(self,name=None):
price = Decimal(0)
if self.sale_line:
# if self.line.enable_linked_currency and self.line.linked_price and self.line.linked_currency and self.line.price_type == 'priced':
# return self.line.get_price_linked_currency((self.lot_premium if self.lot_premium else 0))
# else:
return self.sale_line.get_price((self.lot_premium if self.lot_premium else 0))
def get_sale_amount(self,name):
round_context = getcontext()
round_context.rounding = ROUND_HALF_UP
price = self.get_lot_sale_price()
return None
def get_amount(self,name):
round_context = getcontext()
round_context.rounding = ROUND_HALF_UP
price = self.get_lot_price()
return None
class QtType(ModelSQL, ModelView):
"Type"
__name__ = 'lot.qt.type'
name = fields.Char("Name")
sequence = fields.Integer("Sequence")
class QtHist(ModelSQL, ModelView):
"Quantities"
__name__ = 'lot.qt.hist'
lot = fields.Many2One(
'lot.lot', "Lot", ondelete='CASCADE',
)
quantity_type = fields.Many2One('lot.qt.type',"Type")
quantity = fields.Numeric("Net weight",digits=(1,5))
gross_quantity = fields.Numeric("Gross weight",digits=(1,5))
class LotSplitMerge(ModelSQL, ModelView):
"Lot Split Merge"
__name__ = 'lot.split.merge'
operation = fields.Selection([
('split', 'Split'),
('merge', 'Merge'),
], 'Operation', required=True)
source_lot = fields.Many2One(
'lot.lot', 'Source Lot', required=True, ondelete='CASCADE'
)
target_lot = fields.Many2One(
'lot.lot', 'Target Lot', required=True, ondelete='CASCADE'
)
lot_qt = fields.Float('Elements count')
quantity = fields.Numeric('Weight', digits=(16, 5))
operation_date = fields.Date('Date', required=True)
reversed_by = fields.Many2One(
'lot.split.merge', 'Reversed By'
)
class SplitLine(ModelView):
"Split Line"
__name__ = 'lot.split.wizard.line'
name = fields.Char('Lot name')
lot_qt = fields.Float('Quantity')
weight = fields.Numeric('Weight', digits=(16,5))
class SplitWizardStart(ModelView):
"Split Line Start"
__name__ = 'lot.split.wizard.start'
mode = fields.Selection([
('equal', 'Equal parts'),
('manual', 'Manual'),
], 'Mode', required=True)
parts = fields.Integer(
'Parts',
states={'required': Eval('mode') == 'equal'}
)
create_remainder = fields.Boolean(
'Create remainder lot',
help='If checked, a remainder lot will be created when the split '
'does not exactly match the original weight.'
)
lines = fields.One2Many(
'lot.split.wizard.line',
None,
'Lines',
states={'invisible': Eval('mode') != 'manual'}
)
class SplitWizard(Wizard):
"Lot Split Wizard"
__name__ = 'lot.split.wizard'
start = StateView(
'lot.split.wizard.start',
'lot.split_wizard_start_view',
[
Button('Cancel', 'end', 'tryton-cancel'),
Button('Split', 'split', 'tryton-ok', default=True),
]
)
split = StateTransition()
def transition_split(self):
Lot = Pool().get('lot.lot')
lot = Lot(Transaction().context['active_id'])
if self.start.mode == 'equal':
part = lot.get_current_quantity() / self.start.parts
splits = [
{'quantity': part, 'name': f'{lot.lot_name}-{i+1}'}
for i in range(self.start.parts)
]
else:
splits = [
{'quantity': l.weight, 'lot_qt': l.lot_qt, 'name': l.name}
for l in self.start.lines
]
lot.split_by_weight(splits)
return 'end'
class SplitWizard(Wizard):
__name__ = 'lot.split.wizard'
start = StateView(
'lot.split.wizard.start',
'lot.split_wizard_start_view',
[
Button('Cancel', 'end', 'tryton-cancel'),
Button('Split', 'split', 'tryton-ok', default=True),
]
)
split = StateTransition()
def transition_split(self):
Lot = Pool().get('lot.lot')
lot = Lot(Transaction().context['active_id'])
total_weight = lot.get_current_quantity().quantize(
Decimal('0.00001'), rounding=ROUND_HALF_UP
)
splits = []
allocated = Decimal('0.0')
# -------------------------------------------------
# EQUAL MODE
# -------------------------------------------------
if self.start.mode == 'equal':
if not self.start.parts or self.start.parts <= 0:
raise UserError('Number of parts must be greater than zero.')
base = (total_weight / self.start.parts).quantize(
Decimal('0.00001'), rounding=ROUND_HALF_UP
)
for i in range(self.start.parts):
qty = base
if i == self.start.parts - 1:
qty = total_weight - allocated # absorbe le reste
splits.append({
'quantity': qty,
'lot_qt': lot.lot_qt / self.start.parts if lot.lot_qt else None,
'name': f'{lot.lot_name}-{i + 1}',
})
allocated += qty
# -------------------------------------------------
# MANUAL MODE
# -------------------------------------------------
else:
for line in self.start.lines:
if not line.weight or line.weight <= 0:
raise UserError('Each line must have a positive weight.')
splits.append({
'quantity': line.weight,
'lot_qt': line.lot_qt,
'name': line.name,
})
allocated += line.weight
diff = (total_weight - allocated).quantize(
Decimal('0.00001'), rounding=ROUND_HALF_UP
)
if diff != 0:
if self.start.create_remainder and diff > 0:
splits.append({
'quantity': diff,
'lot_qt': None,
'name': f'{lot.lot_name}-REMAINDER',
})
else:
raise UserError(
f'Split does not match the lot weight.\n'
f'Remaining difference: {diff}'
)
lot.split_by_weight(splits)
return 'end'
class MergeLotsStart(ModelView):
"Merge Lots Start"
__name__ = 'lot.merge.wizard.start'
confirm = fields.Boolean('Confirm merge')
class MergeLots(Wizard):
"Merge Lots"
__name__ = 'lot.merge.wizard'
start = StateView(
'lot.merge.wizard.start',
'lot.lot_merge_wizard_start_view',
[
Button('Cancel', 'end', 'tryton-cancel'),
Button('Merge', 'merge', 'tryton-ok', default=True),
]
)
merge = StateTransition()
def transition_merge(self):
Lot = Pool().get('lot.lot')
ids = Transaction().context['active_ids']
lots = Lot.browse(ids)
Lot.merge_lots(lots)
return 'end'