Files
tradon/modules/purchase_trade/fee.py
2026-01-17 17:36:39 +01:00

589 lines
24 KiB
Python
Executable File

from functools import wraps
from trytond.model import fields
from trytond.report import Report
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Bool, Eval, Id, If
from trytond.model import (ModelSQL, ModelView)
from trytond.tools import is_full_text, lstrip_wildcard
from trytond.transaction import Transaction, inactive_records
from decimal import getcontext, Decimal, ROUND_HALF_UP
from sql.aggregate import Count, Max, Min, Sum, Avg, BoolOr
from sql.conditionals import Case
from sql import Column, Literal
from sql.functions import CurrentTimestamp, DateTrunc
from trytond.wizard import Button, StateTransition, StateView, Wizard
from itertools import chain, groupby
from operator import itemgetter
import datetime
import logging
from collections import defaultdict
from trytond.exceptions import UserWarning, UserError
logger = logging.getLogger(__name__)
def filter_state(state):
def filter(func):
@wraps(func)
def wrapper(cls, fees):
fees = [f for f in fees if f.state == state]
return func(cls, fees)
return wrapper
return filter
class Fee(ModelSQL,ModelView):
"Fee"
__name__ = 'fee.fee'
line = fields.Many2One('purchase.line',"Line")
shipment_in = fields.Many2One('stock.shipment.in')
shipment_out = fields.Many2One('stock.shipment.out')
shipment_internal = fields.Many2One('stock.shipment.internal')
currency = fields.Many2One('currency.currency',"Currency")
supplier = fields.Many2One('party.party',"Supplier", required=True)
type = fields.Selection([
('budgeted', 'Budgeted'),
('ordered', 'Ordered'),
('actual', 'Actual'),
], "Type", required=True)
p_r = fields.Selection([
('pay', 'PAY'),
('rec', 'REC'),
], "P/R", required=True)
product = fields.Many2One('product.product',"Product", required=True, domain=[('type', '=', 'service')])
price = fields.Numeric("Price",digits=(1,4))
mode = fields.Selection([
('lumpsum', 'Lump sum'),
('perqt', 'Per qt'),
('pprice', '% price'),
('pcost', '% cost price'),
], 'Mode', required=True)
inherit_qt = fields.Boolean("Inh Qt")
quantity = fields.Function(fields.Numeric("Qt",digits='unit'),'get_quantity')
unit = fields.Function(fields.Many2One('product.uom',"Unit"),'get_unit')
inherit_shipment = fields.Boolean("Inh Sh",states={
'invisible': (Eval('shipment_in')),
})
purchase = fields.Many2One('purchase.purchase',"Purchase", ondelete='CASCADE')
amount = fields.Function(fields.Numeric("Amount", digits='currency'),'get_amount')
fee_lots = fields.Function(fields.Many2Many('lot.lot', None, None, "Lots"),'get_lots')#, searcher='search_lots')
lots = fields.Many2Many('fee.lots', 'fee', 'lot',"Lots",domain=[('id', 'in', Eval('fee_lots',-1))] )
lots_cp = fields.Integer("Lots number")
state = fields.Selection([
('not invoiced', 'Not invoiced'),
('invoiced', 'Invoiced'),
], string='State', readonly=True)
fee_landed_cost = fields.Function(fields.Boolean("Inventory"),'get_landed_status')
inv = fields.Function(fields.Many2One('account.invoice',"Invoice"),'get_invoice')
weight_type = fields.Selection([
('net', 'Net'),
('brut', 'Brut'),
], string='W. type')
def get_lots(self, name):
logger.info("GET_LOTS_LINE:%s",self.line)
logger.info("GET_LOTS_SHIPMENT_IN:%s",self.shipment_in)
Lot = Pool().get('lot.lot')
if self.line:
return self.line.lots
if self.shipment_in:
lots = Lot.search([('lot_shipment_in','=',self.shipment_in.id)])
logger.info("LOTSDOMAIN:%s",lots)
if lots:
return lots + [lots[0].getVlot_p()]
if self.shipment_internal:
return Lot.search([('lot_shipment_internal','=',self.shipment_internal.id)])
if self.shipment_out:
return Lot.search([('lot_shipment_out','=',self.shipment_out.id)])
return Lot.search(['id','>',0])
def get_cog(self,lot):
MoveLine = Pool().get('account.move.line')
Currency = Pool().get('currency.currency')
Date = Pool().get('ir.date')
AccountConfiguration = Pool().get('account.configuration')
account_configuration = AccountConfiguration(1)
Uom = Pool().get('product.uom')
ml = MoveLine.search([
('lot', '=', lot.id),
('fee', '=', self.id),
('account', '=', self.product.account_stock_in_used.id),
('origin', 'ilike', '%stock.move%'),
])
logger.info("GET_COG_FEE:%s",ml)
if ml:
return round(Decimal(sum([e.credit-e.debit for e in ml if e.description != 'Delivery fee'])),2)
@classmethod
def __setup__(cls):
super().__setup__()
cls._buttons.update({
'invoice': {
'invisible': (Eval('state') == 'invoiced'),
'depends': ['state'],
},
})
@classmethod
def default_state(cls):
return 'not invoiced'
@classmethod
def default_p_r(cls):
return 'pay'
def get_unit(self, name):
Lot = Pool().get('lot.lot')
if self.lots:
if self.lots[0].line:
return self.lots[0].line.unit
if self.lots[0].sale_line:
return self.lots[0].sale_line.unit
@classmethod
@ModelView.button
@filter_state('not invoiced')
def invoice(cls, fees):
Purchase = Pool().get('purchase.purchase')
FeeLots = Pool().get('fee.lots')
for fee in fees:
if fee.purchase:
fl = FeeLots.search([('fee','=',fee.id)])
logger.info("PROCESS_FROM_FEE:%s",fl)
Purchase._process_invoice([fee.purchase],[e.lot for e in fl],'service')
cls.write(fees, {'state': 'invoiced',})
@classmethod
def default_type(cls):
return 'budgeted'
@classmethod
def default_weight_type(cls):
return 'brut'
def get_price_per_qt(self):
price = Decimal(0)
if self.mode == 'lumpsum':
if self.quantity:
if self.quantity > 0:
return round(self.price / self.quantity,4)
elif self.mode == 'perqt':
return self.price
elif self.mode == 'pprice' or self.mode == 'pcost':
if self.line and self.price:
return round(self.price * Decimal(self.line.unit_price) / 100,4)
if self.sale_line and self.price:
return round(self.price * Decimal(self.sale_line.unit_price) / 100,4)
if self.shipment_in:
StockMove = Pool().get('stock.move')
sm = StockMove.search(['shipment','=','stock.shipment.in,'+str(self.shipment_in.id)])
if sm:
if sm[0].lot:
return round(self.price * Decimal(sm[0].lot.get_lot_price()) / 100,4)
return price
def get_invoice(self,name):
if self.purchase:
if self.purchase.invoices:
return self.purchase.invoices[0]
def get_landed_status(self,name):
if self.product:
return self.product.landed_cost
def get_quantity(self,name=None):
qt = self.get_fee_lots_qt()
if qt:
return qt
LotQt = Pool().get('lot.qt')
lqts = LotQt.search(['lot_shipment_in','=',self.shipment_in.id])
if lqts:
return Decimal(lqts[0].lot_quantity)
def get_amount(self,name=None):
sign = Decimal(1)
if self.price:
# if self.p_r:
# if self.p_r == 'pay':
# sign = -1
if self.mode == 'lumpsum':
return self.price * sign
elif self.mode == 'perqt':
if self.shipment_in:
StockMove = Pool().get('stock.move')
sm = StockMove.search(['shipment','=','stock.shipment.in,'+str(self.shipment_in.id)])
if sm:
unique_lots = {e.lot for e in sm if e.lot}
return round(self.price * Decimal(sum([e.get_current_quantity_converted() for e in unique_lots])) * sign,2)
LotQt = Pool().get('lot.qt')
lqts = LotQt.search(['lot_shipment_in','=',self.shipment_in.id])
if lqts:
return round(self.price * Decimal(lqts[0].lot_quantity) * sign,2)
return round((self.quantity if self.quantity else 0) * self.price * sign,2)
elif self.mode == 'pprice':
if self.line:
return round(self.price / 100 * self.line.unit_price * (self.quantity if self.quantity else 0) * sign,2)
if self.sale_line:
return round(self.price / 100 * self.sale_line.unit_price * (self.quantity if self.quantity else 0) * sign,2)
if self.shipment_in:
StockMove = Pool().get('stock.move')
sm = StockMove.search(['shipment','=','stock.shipment.in,'+str(self.shipment_in.id)])
if sm:
if sm[0].lot:
return round(self.price * Decimal(sum([e.lot.get_lot_price() for e in sm if e.lot])) / 100 * self.quantity * sign,2)
LotQt = Pool().get('lot.qt')
lqts = LotQt.search(['lot_shipment_in','=',self.shipment_in.id])
if lqts:
return round(self.price * Decimal(lqts[0].lot_p.get_lot_price()) / 100 * lqts[0].lot_quantity * sign,2)
@classmethod
def write(cls, *args):
super().write(*args)
fees = sum(args[::2], [])
for fee in fees:
fee.adjust_purchase_values()
@classmethod
def copy(cls, fees, default=None):
if default is None:
default = {}
else:
default = default.copy()
# Important : on vide le champ 'lots'
default.setdefault('lots', [])
return super().copy(fees, default=default)
def get_fee_lots_qt(self):
qt = Decimal(0)
FeeLots = Pool().get('fee.lots')
fee_lots = FeeLots.search([('fee', '=', self.id)])
if fee_lots:
qt = sum([e.lot.get_current_quantity_converted() for e in fee_lots])
logger.info("GET_FEE_LOTS_QT:%s",qt)
return qt
def adjust_purchase_values(self):
Purchase = Pool().get('purchase.purchase')
PurchaseLine = Pool().get('purchase.line')
logger.info("ADJUST_PURCHASE_VALUES:%s",self)
if self.type == 'ordered' and self.state == 'not invoiced' and self.purchase:
logger.info("ADJUST_PURCHASE_VALUES_QT:%s",self.purchase.lines[0].quantity)
if self.price != self.purchase.lines[0].unit_price:
self.purchase.lines[0].unit_price = self.price
if self.quantity != self.purchase.lines[0].quantity:
self.purchase.lines[0].quantity = self.quantity
if self.product != self.purchase.lines[0].product:
self.purchase.lines[0].product = self.product
PurchaseLine.save([self.purchase.lines[0]])
if self.supplier != self.purchase.party:
self.purchase.party = self.supplier
if self.currency != self.purchase.currency:
self.purchase.currency = self.currency
Purchase.save([self.purchase])
# @classmethod
# def validate(cls, fees):
# super(Fee, cls).validate(fees)
@classmethod
def create(cls, vlist):
vlist = [x.copy() for x in vlist]
fees = super(Fee, cls).create(vlist)
qt_sh = Decimal(0)
qt_line = Decimal(0)
unit = None
for fee in fees:
FeeLots = Pool().get('fee.lots')
Lots = Pool().get('lot.lot')
LotQt = Pool().get('lot.qt')
if fee.line:
for l in fee.line.lots:
if (l.lot_type == 'virtual' and len(fee.line.lots)==1) or (l.lot_type == 'physic' and len(fee.line.lots)>1):
fl = FeeLots()
fl.fee = fee.id
fl.lot = l.id
fl.line = l.line.id
FeeLots.save([fl])
qt_line += l.get_current_quantity_converted()
unit = l.line.unit
if fee.sale_line:
for l in fee.sale_line.lots:
if (l.lot_type == 'virtual' and len(fee.sale_line.lots)==1) or (l.lot_type == 'physic' and len(fee.sale_line.lots)>1):
fl = FeeLots()
fl.fee = fee.id
fl.lot = l.id
fl.sale_line = l.sale_line.id
FeeLots.save([fl])
qt_line += l.get_current_quantity_converted()
unit = l.sale_line.unit
if fee.shipment_in:
if fee.shipment_in.state == 'draft'or fee.shipment_in.state == 'started':
lots = Lots.search(['lot_shipment_in','=',fee.shipment_in.id])
if lots:
for l in lots:
fl = FeeLots()
fl.fee = fee.id
fl.lot = l.id
FeeLots.save([fl])
qt_sh += l.get_current_quantity_converted()
unit = l.line.unit
else:
lqts = LotQt.search(['lot_shipment_in','=',fee.shipment_in.id])
if lqts:
for l in lqts:
qt_sh += l.lot_p.get_current_quantity_converted()
unit = l.lot_p.line.unit
else:
raise UserError("You cannot add fee on received shipment!")
type = fee.type
if type == 'ordered':
Purchase = Pool().get('purchase.purchase')
PurchaseLine = Pool().get('purchase.line')
pl = PurchaseLine()
pl.product = fee.product
if fee.line or fee.sale_line:
pl.quantity = round(qt_line,5)
if fee.shipment_in:
pl.quantity = round(qt_sh,5)
logger.info("CREATE_PURHCASE_FOR_FEE_QT:%s",pl.quantity)
pl.unit = unit
pl.fee_ = fee.id
if fee.price:
fee_price = fee.get_price_per_qt()
logger.info("GET_FEE_PRICE_PER_QT:%s",fee_price)
pl.unit_price = round(Decimal(fee_price),4)
p = Purchase()
p.lines = [pl]
p.party = fee.supplier
if p.party.addresses:
p.invoice_address = p.party.addresses[0]
p.currency = fee.currency
p.line_type = 'service'
Purchase.save([p])
#if reception of moves done we need to generate accrual for fee
if not fee.sale_line:
feelots = FeeLots.search(['fee','=',fee.id])
for fl in feelots:
move = fl.lot.get_received_move()
if move:
Warning = Pool().get('res.user.warning')
warning_name = Warning.format("Lot ever received", [])
if Warning.check(warning_name):
raise UserWarning(warning_name,
"By clicking yes, an accrual for this fee will be created")
AccountMove = Pool().get('account.move')
account_move = move._get_account_stock_move_fee(fee)
AccountMove.save([account_move])
return fees
class FeeLots(ModelSQL,ModelView):
"Fee lots"
__name__ = 'fee.lots'
fee = fields.Many2One('fee.fee',"Fee",required=True, ondelete='CASCADE')
lot = fields.Many2One('lot.lot',"Lot",required=True, ondelete='CASCADE')
class FeeReport(
ModelSQL, ModelView):
"Fee Report"
__name__ = 'fee.report'
r_purchase_line = fields.Many2One('purchase.line', "Purchase line")
r_sale_line = fields.Many2One('sale.line', "Sale line")
r_shipment_in = fields.Many2One('stock.shipment.in', "Shipment in")
r_shipment_out = fields.Many2One('stock.shipment.out', "Shipment out")
r_shipment_internal = fields.Many2One('stock.shipment.internal', "Shipment internal")
r_fee_type = fields.Many2One('product.product', 'Fee type')
r_fee_counterparty = fields.Many2One('party.party', "Counterparty", required=True)
r_type = fields.Selection([
('ordered', 'Ordered'),
('budgeted', 'Budgeted'),
('actual', 'Actual')
], 'Type')
r_fee_paystatus = fields.Selection([
('pay', 'PAY'),
('rec', 'REC')
], 'Pay status')
r_mode = fields.Selection([
('lumpsum', 'Lump sum'),
('perqt', 'Per qt'),
('pprice', '% price'),
('pcost', '% cost price'),
], 'Mode', required=True)
r_fee_quantity = fields.Function(fields.Numeric("Qt",digits=(1,4)),'get_quantity')
r_fee_unit = fields.Function(fields.Many2One('product.uom',"Unit"),'get_unit')
r_purchase = fields.Many2One('purchase.purchase',"Purchase", ondelete='CASCADE')
r_fee_amount = fields.Function(fields.Numeric("Amount", digits=(1,4)),'get_amount')
r_inv = fields.Function(fields.Many2One('account.invoice',"Invoice"),'get_invoice')
r_state = fields.Selection([
('not invoiced', 'Not invoiced'),
('invoiced', 'Invoiced'),
], string='State', readonly=True)
#r_fee_lots = fields.Function(fields.Many2Many('lot.lot', None, None, "Lots"),'get_lots')#, searcher='search_lots')
#r_lots = fields.Many2Many('fee.lots', 'fee', 'lot',"Lots",domain=[('id', 'in', Eval('r_fee_lots',[]))] )
r_fee_currency = fields.Many2One('currency.currency',"Currency")
r_fee_price = fields.Numeric("Price",digits=(1,4))
r_shipment_origin = fields.Function(
fields.Reference(
selection=[
("stock.shipment.in", "In"),
("stock.shipment.out", "Out"),
("stock.shipment.internal", "Internal"),
],
string="Shipment",
),
"get_shipment_origin",
)
def get_invoice(self,name):
if self.r_purchase:
if self.r_purchase.invoices:
return self.r_purchase.invoices[0]
def get_shipment_origin(self, name):
if self.r_shipment_in:
return 'stock.shipment.in,' + str(self.r_shipment_in.id)
elif self.r_shipment_out:
return 'stock.shipment.out,' + str(self.r_shipment_out.id)
elif self.r_shipment_internal:
return 'stock.shipment.internal,' + str(self.r_shipment_internal.id)
return None
# def get_lots(self, name):
# if self.r_purchase_line:
# return self.r_purchase_line.lots
def get_unit(self, name):
if self.r_purchase_line:
return self.r_purchase_line.unit
def get_quantity(self,name=None):
Fee = Pool().get('fee.fee')
fee = Fee(self.id)
return fee.get_quantity()
def get_amount(self,name=None):
Fee = Pool().get('fee.fee')
fee = Fee(self.id)
return fee.get_amount()
@classmethod
def table_query(cls):
FeeReport = Pool().get('fee.fee')
fr = FeeReport.__table__()
Purchase = Pool().get('purchase.purchase')
pu = Purchase.__table__()
PurchaseLine = Pool().get('purchase.line')
pl = PurchaseLine.__table__()
Sale = Pool().get('sale.sale')
sa = Sale.__table__()
SaleLine = Pool().get('sale.line')
sl = SaleLine.__table__()
context = Transaction().context
party = context.get('party')
fee_type = context.get('fee_type')
purchase = context.get('purchase')
sale = context.get('sale')
shipment_in = context.get('shipment_in')
shipment_out = context.get('shipment_out')
shipment_internal = context.get('shipment_internal')
asof = context.get('asof')
todate = context.get('todate')
wh = ((fr.create_date >= asof) & ((fr.create_date-datetime.timedelta(1)) <= todate))
if party:
wh &= (fr.fee_counterparty == party)
if fee_type:
wh &= (fr.fee_type == fee_type)
if purchase:
wh &= (pu.id == purchase)
if sale:
wh &= (sa.id == sale)
if shipment_in:
wh &= (fr.shipment_in == shipment_in)
# if shipment_out:
# wh &= (fr.shipment_out == shipment_out)
query = fr.join(pl,'LEFT',condition=fr.line == pl.id).join(pu,'LEFT', condition=pl.purchase == pu.id).select(
Literal(0).as_('create_uid'),
CurrentTimestamp().as_('create_date'),
Literal(None).as_('write_uid'),
Literal(None).as_('write_date'),
fr.id.as_('id'),
fr.line.as_('r_purchase_line'),
Literal(None).as_('r_sale_line'),
fr.shipment_in.as_('r_shipment_in'),
Literal(None).as_('r_shipment_out'),
fr.shipment_internal.as_('r_shipment_internal'),
fr.product.as_('r_fee_type'),
fr.supplier.as_('r_fee_counterparty'),
fr.type.as_('r_type'),
fr.p_r.as_('r_fee_paystatus'),
fr.mode.as_('r_mode'),
fr.state.as_('r_state'),
fr.purchase.as_('r_purchase'),
#fr.amount.as_('r_fee_amount'),
fr.price.as_('r_fee_price'),
fr.currency.as_('r_fee_currency'),
#fr.fee_lots.as_('r_fee_lots'),
#fr.lots.as_('r_lots'),
where=wh)
return query
@classmethod
def search_rec_name(cls, name, clause):
_, operator, operand, *extra = clause
if operator.startswith('!') or operator.startswith('not '):
bool_op = 'AND'
else:
bool_op = 'OR'
code_value = operand
if operator.endswith('like') and is_full_text(operand):
code_value = lstrip_wildcard(operand)
return [bool_op,
('r_fee_type', operator, operand, *extra),
('r_fee_counterparty', operator, operand, *extra),
]
class FeeContext(ModelView):
"Fee Context"
__name__ = 'fee.context'
asof = fields.Date("As of")
todate = fields.Date("To")
party = fields.Many2One('party.party', "Counterparty")
fee_type = fields.Many2One('product.product', 'Fee type')
purchase = fields.Many2One('purchase.purchase', "Purchase")
sale = fields.Many2One('sale.sale', "Sale")
shipment_in = fields.Many2One('stock.shipment.in',"Shipment In")
shipment_out = fields.Many2One('stock.shipment.out',"Shipment Out")
shipment_internal = fields.Many2One('stock.shipment.internal',"Shipment Internal")
@classmethod
def default_asof(cls):
pool = Pool()
Date = pool.get('ir.date')
return Date.today().replace(day=1,month=1,year=1999)
@classmethod
def default_todate(cls):
pool = Pool()
Date = pool.get('ir.date')
return Date.today()