Files
tradon/modules/purchase_trade/sale.py
2026-04-09 21:23:27 +02:00

1794 lines
69 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from trytond.model import fields
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Bool, Eval, Id, If, PYSONEncoder
from trytond.model import (ModelSQL, ModelView)
from trytond.i18n import gettext
from trytond.wizard import Button, StateTransition, StateView, Wizard, StateAction
from trytond.transaction import Transaction, inactive_records
from decimal import getcontext, Decimal, ROUND_HALF_UP
from sql.aggregate import Count, Max, Min, Sum, Avg, BoolOr
from sql.conditionals import Case
from sql import Column, Literal
from sql.functions import CurrentTimestamp, DateTrunc
import datetime
import logging
import json
from trytond.exceptions import UserWarning, UserError
from trytond.modules.purchase_trade.numbers_to_words import quantity_to_words, amount_to_currency_words, format_date_en
logger = logging.getLogger(__name__)
VALTYPE = [
('priced', 'Price'),
('fee', 'Fee'),
('market', 'Market'),
('derivative', 'Derivative'),
]
class ContractDocumentType(metaclass=PoolMeta):
"Contract - Document Type"
__name__ = 'contract.document.type'
# lc_out = fields.Many2One('lc.letter.outgoing', 'LC out')
# lc_in = fields.Many2One('lc.letter.incoming', 'LC in')
sale = fields.Many2One('sale.sale', "Sale")
class AnalyticDimensionAssignment(metaclass=PoolMeta):
'Analytic Dimension Assignment'
__name__ = 'analytic.dimension.assignment'
sale = fields.Many2One('sale.sale', "Sale")
class Estimated(metaclass=PoolMeta):
"Estimated date"
__name__ = 'pricing.estimated'
sale_line = fields.Many2One('sale.line',"Line")
class FeeLots(metaclass=PoolMeta):
"Fee lots"
__name__ = 'fee.lots'
sale_line = fields.Many2One('sale.line',"Line")
class Backtoback(metaclass=PoolMeta):
'Back To Back'
__name__ = 'back.to.back'
sale = fields.One2Many('sale.sale','btb', "Sale")
class OpenPosition(metaclass=PoolMeta):
"Open position"
__name__ = 'open.position'
sale = fields.Many2One('sale.sale',"Sale")
sale_line = fields.Many2One('sale.line',"Sale Line")
client = fields.Many2One('party.party',"Client")
class SaleStrategy(ModelSQL):
"Sale - Document Type"
__name__ = 'sale.strategy'
sale_line = fields.Many2One('sale.line', 'Sale Line')
strategy = fields.Many2One('mtm.strategy', "Strategy")
class Component(metaclass=PoolMeta):
"Component"
__name__ = 'pricing.component'
sale_line = fields.Many2One('sale.line',"Line")
quota_sale = fields.Function(fields.Numeric("Quota",digits='unit'),'get_quota_sale')
unit_sale = fields.Function(fields.Many2One('product.uom',"Unit"),'get_unit_sale')
def getDelMonthDateSale(self):
PM = Pool().get('product.month')
if self.sale_line and hasattr(self.sale_line, 'del_period') and self.sale_line.del_period:
pm = PM(self.sale_line.del_period)
if pm:
logger.info("DELMONTHDATE:%s",pm.beg_date)
return pm.beg_date
def getEstimatedTriggerSale(self,t):
logger.info("GETTRIGGER:%s",t)
if t == 'delmonth':
return self.getDelMonthDateSale()
PE = Pool().get('pricing.estimated')
Date = Pool().get('ir.date')
pe = PE.search([('sale_line','=',self.sale_line),('trigger','=',t)])
if pe:
return pe[0].estimated_date
else:
return Date.today()
def get_unit_sale(self, name):
if self.sale_line:
return self.sale_line.unit
def get_quota_sale(self, name):
if self.sale_line:
quantity = getattr(self.sale_line, 'quantity_theorical', None)
if quantity is None:
quantity = getattr(self.sale_line, 'quantity', None)
if quantity is not None:
nbdays = self.nbdays if self.nbdays and self.nbdays > 0 else 1
return round(Decimal(quantity) / nbdays, 4)
class Pricing(metaclass=PoolMeta):
"Pricing"
__name__ = 'pricing.pricing'
sale_line = fields.Many2One('sale.line',"Lines")
unit = fields.Function(fields.Many2One('product.uom',"Unit"),'get_unit_sale')
def get_unit_sale(self,name):
if self.sale_line:
return self.sale_line.unit
def get_eod_price_sale(self):
if self.sale_line:
return round((self.fixed_qt * self.fixed_qt_price + self.unfixed_qt * self.unfixed_qt_price)/Decimal(self.sale_line.quantity),4)
return Decimal(0)
class Summary(ModelSQL,ModelView):
"Pricing summary"
__name__ = 'sale.pricing.summary'
sale_line = fields.Many2One('sale.line',"Lines")
price_component = fields.Many2One('pricing.component',"Component")
quantity = fields.Numeric("Qt",digits=(1,4))
fixed_qt = fields.Numeric("Fixed qt",digits=(1,4))
unfixed_qt = fields.Numeric("Unfixed qt",digits=(1,4))
price = fields.Numeric("Price",digits=(1,4))
progress = fields.Float("Fix. progress")
ratio = fields.Numeric("Ratio")
def get_name(self):
if self.price_component:
return self.price_component.get_rec_name()
return ""
def get_last_price(self):
Date = Pool().get('ir.date')
if self.price_component:
pc = Pool().get('pricing.component')(self.price_component)
if pc.price_index:
PI = Pool().get('price.price')
pi = PI(pc.price_index)
return pi.get_price(Date.today(),self.sale_line.unit,self.sale_line.sale.currency,True)
@classmethod
def table_query(cls):
SalePricing = Pool().get('pricing.pricing')
sp = SalePricing.__table__()
SaleComponent = Pool().get('pricing.component')
sc = SaleComponent.__table__()
#wh = Literal(True)
context = Transaction().context
group_pnl = context.get('group_pnl')
if group_pnl:
return None
query = sp.join(sc,'LEFT',condition=sp.price_component == sc.id).select(
Literal(0).as_('create_uid'),
CurrentTimestamp().as_('create_date'),
Literal(None).as_('write_uid'),
Literal(None).as_('write_date'),
Max(sp.id).as_('id'),
sp.sale_line.as_('sale_line'),
sp.price_component.as_('price_component'),
Max(sp.fixed_qt+sp.unfixed_qt).as_('quantity'),
Max(sp.fixed_qt).as_('fixed_qt'),
(Min(sp.unfixed_qt)).as_('unfixed_qt'),
Max(Case((sp.last, sp.eod_price),else_=0)).as_('price'),
(Max(sp.fixed_qt)/Max(sp.fixed_qt+sp.unfixed_qt)).as_('progress'),
Max(sc.ratio).as_('ratio'),
#where=wh,
group_by=[sp.sale_line,sp.price_component])
return query
class Lot(metaclass=PoolMeta):
__name__ = 'lot.lot'
sale_line = fields.Many2One('sale.line',"Sale",ondelete='CASCADE')
lot_quantity_sale = fields.Function(fields.Numeric("Net weight",digits='lot_unit'),'get_qt')
lot_gross_quantity_sale = fields.Function(fields.Numeric("Gross weight",digits='lot_unit'),'get_gross_qt')
def get_qt(self, name):
quantity = self.lot_quantity
if self.lot_hist:
for h in self.lot_hist:
if h.quantity_type.id == 3:
quantity = h.quantity
return quantity
def get_gross_qt(self, name):
quantity = self.lot_quantity
if self.lot_hist:
for h in self.lot_hist:
if h.quantity_type.id == 3:
quantity = h.quantity
return quantity
def getClient(self):
if self.sale_line:
return Pool().get('sale.sale')(self.sale_line.sale).party.id
def getSale(self):
if self.sale_line:
return self.sale_line.sale.id
class Sale(metaclass=PoolMeta):
__name__ = 'sale.sale'
btb = fields.Many2One('back.to.back',"Back to back")
bank_accounts = fields.Function(
fields.Many2Many('bank.account', None, None, "Bank Accounts"),
'on_change_with_bank_accounts')
bank_account = fields.Many2One(
'bank.account', "Bank Account",
domain=[('id', 'in', Eval('bank_accounts', []))],
depends=['bank_accounts'])
from_location = fields.Many2One('stock.location', 'From location', required=True,domain=[('type', "!=", 'customer')])
to_location = fields.Many2One('stock.location', 'To location', required=True,domain=[('type', "!=", 'supplier')])
shipment_out = fields.Many2One('stock.shipment.out','Sales')
#pnl = fields.One2Many('valuation.valuation', 'sale', 'Pnl')
pnl = fields.One2Many('valuation.valuation.dyn', 'r_sale', 'Pnl',states={'invisible': ~Eval('group_pnl'),})
pnl_ = fields.One2Many('valuation.valuation.line', 'sale', 'Pnl',states={'invisible': Eval('group_pnl'),})
group_pnl = fields.Boolean("Group Pnl")
derivatives = fields.One2Many('derivative.derivative', 'sale', 'Derivative')
#plans = fields.One2Many('workflow.plan','sale',"Execution plans")
forex = fields.One2Many('forex.cover.physical.sale','contract',"Forex",readonly=True)
broker = fields.Many2One('party.party',"Broker",domain=[('categories.parent', 'child_of', [4])])
tol_min = fields.Numeric("Tol - in %", required=True)
tol_max = fields.Numeric("Tol + in %", required=True)
tol_min_qt = fields.Numeric("Tol -")
tol_max_qt = fields.Numeric("Tol +")
certif = fields.Many2One('purchase.certification',"Certification", required=True,states={'invisible': Eval('company_visible'),})
wb = fields.Many2One('purchase.weight.basis',"Weight basis", required=True)
association = fields.Many2One('purchase.association',"Association", required=True,states={'invisible': Eval('company_visible'),})
crop = fields.Many2One('purchase.crop',"Crop",states={'invisible': Eval('company_visible'),})
viewer = fields.Function(fields.Text(""),'get_viewer')
doc_template = fields.Many2One('doc.template',"Template")
required_documents = fields.Many2Many(
'contract.document.type', 'sale', 'doc_type', 'Required Documents')
analytic_dimensions = fields.One2Many(
'analytic.dimension.assignment',
'sale',
'Analytic Dimensions'
)
trader = fields.Many2One(
'party.party', "Trader",
domain=[('categories.name', '=', 'TRADER')])
operator = fields.Many2One(
'party.party', "Operator",
domain=[('categories.name', '=', 'OPERATOR')])
our_reference = fields.Char("Our Reference")
company_visible = fields.Function(
fields.Boolean("Visible"), 'on_change_with_company_visible')
lc_date = fields.Date("LC date")
product_origin = fields.Char("Origin")
@fields.depends('company', '_parent_company.party')
def on_change_with_company_visible(self, name=None):
return bool(
self.company and self.company.party
and self.company.party.name == 'MELYA')
def _get_default_bank_account(self):
if not self.party or not self.party.bank_accounts:
return None
party_bank_accounts = list(self.party.bank_accounts)
if self.currency:
for account in party_bank_accounts:
if account.currency == self.currency:
return account
return party_bank_accounts[0]
@fields.depends('party', '_parent_party.bank_accounts')
def on_change_with_bank_accounts(self, name=None):
if self.party and self.party.bank_accounts:
return [account.id for account in self.party.bank_accounts]
return []
@fields.depends(
'company', 'party', 'invoice_party', 'shipment_party', 'warehouse',
'payment_term', 'lines', 'bank_account', '_parent_party.bank_accounts')
def on_change_party(self):
super().on_change_party()
self.bank_account = self._get_default_bank_account()
@fields.depends('party', 'currency', '_parent_party.bank_accounts')
def on_change_currency(self):
self.bank_account = self._get_default_bank_account()
@classmethod
def default_wb(cls):
WB = Pool().get('purchase.weight.basis')
wb = WB.search(['id','>',0])
if wb:
return wb[0].id
@classmethod
def default_certif(cls):
Certification = Pool().get('purchase.certification')
certification = Certification.search(['id','>',0])
if certification:
return certification[0].id
@classmethod
def default_association(cls):
Association = Pool().get('purchase.association')
association = Association.search(['id','>',0])
if association:
return association[0].id
@classmethod
def default_tol_min(cls):
return 0
@classmethod
def default_tol_max(cls):
return 0
def _get_report_lines(self):
return [line for line in self.lines if getattr(line, 'type', None) == 'line']
def _get_report_first_line(self):
lines = self._get_report_lines()
if lines:
return lines[0]
@staticmethod
def _format_report_number(value, digits='0.0000', keep_trailing_decimal=False,
strip_trailing_zeros=True):
value = Decimal(str(value or 0)).quantize(Decimal(digits))
text = format(value, 'f')
if strip_trailing_zeros:
text = text.rstrip('0').rstrip('.')
if keep_trailing_decimal and '.' not in text:
text += '.0'
return text or '0'
def _format_report_price_words(self, line):
value = self._get_report_display_price_value(line)
currency = self._get_report_display_currency(line)
if currency and (currency.rec_name or '').upper() == 'USC':
return amount_to_currency_words(value, 'USC', 'USC')
return amount_to_currency_words(value)
def _get_report_display_currency(self, line):
if getattr(line, 'price_type', None) == 'basis':
if getattr(line, 'enable_linked_currency', False) and getattr(line, 'linked_currency', None):
return line.linked_currency
return self.currency
return getattr(line, 'linked_currency', None) or self.currency
def _get_report_display_unit(self, line):
if getattr(line, 'price_type', None) == 'basis':
if getattr(line, 'enable_linked_currency', False) and getattr(line, 'linked_unit', None):
return line.linked_unit
return getattr(line, 'unit', None)
return getattr(line, 'linked_unit', None) or getattr(line, 'unit', None)
def _get_report_display_price_value(self, line):
if getattr(line, 'price_type', None) == 'basis':
if getattr(line, 'enable_linked_currency', False) and getattr(line, 'linked_currency', None):
return Decimal(str(line.premium or 0))
return Decimal(str(line._get_premium_price() or 0))
if getattr(line, 'linked_price', None):
return Decimal(str(line.linked_price or 0))
return Decimal(str(line.unit_price or 0))
def _format_report_price_line(self, line):
currency = self._get_report_display_currency(line)
unit = self._get_report_display_unit(line)
pricing_text = getattr(line, 'get_pricing_text', '') or ''
parts = [
(currency.rec_name.upper() if currency and currency.rec_name else '').strip(),
self._format_report_number(
self._get_report_display_price_value(line),
strip_trailing_zeros=False),
'PER',
(unit.rec_name.upper() if unit and unit.rec_name else '').strip(),
f"({self._format_report_price_words(line)})",
]
if pricing_text:
parts.append(pricing_text)
return ' '.join(part for part in parts if part)
@property
def report_terms(self):
line = self._get_report_first_line()
if line:
return line.note
return ''
@staticmethod
def _get_report_line_lots(line):
return list(getattr(line, 'lots', []) or [])
@classmethod
def _get_report_preferred_lots(cls, line):
lots = cls._get_report_line_lots(line)
physicals = [
lot for lot in lots
if getattr(lot, 'lot_type', None) == 'physic'
]
if physicals:
return physicals
virtuals = [
lot for lot in lots
if getattr(lot, 'lot_type', None) == 'virtual'
]
if len(virtuals) == 1:
return virtuals
return []
@staticmethod
def _get_report_lot_hist_weights(lot):
if not lot:
return None, None
if hasattr(lot, 'get_hist_quantity'):
net, gross = lot.get_hist_quantity()
return (
Decimal(str(net or 0)),
Decimal(str(gross if gross not in (None, '') else net or 0)),
)
hist = list(getattr(lot, 'lot_hist', []) or [])
state = getattr(lot, 'lot_state', None)
state_id = getattr(state, 'id', None)
if state_id is not None:
for entry in hist:
quantity_type = getattr(entry, 'quantity_type', None)
if getattr(quantity_type, 'id', None) == state_id:
net = Decimal(str(getattr(entry, 'quantity', 0) or 0))
gross = Decimal(str(
getattr(entry, 'gross_quantity', None)
if getattr(entry, 'gross_quantity', None) not in (None, '')
else net))
return net, gross
return None, None
@classmethod
def _get_report_line_weights(cls, line):
lots = cls._get_report_preferred_lots(line)
if lots:
net_total = Decimal(0)
gross_total = Decimal(0)
for lot in lots:
net, gross = cls._get_report_lot_hist_weights(lot)
if net is None:
continue
net_total += net
gross_total += gross
if net_total or gross_total:
return net_total, gross_total
quantity = Decimal(str(getattr(line, 'quantity', 0) or 0))
return quantity, quantity
@classmethod
def _get_report_line_unit(cls, line):
lots = cls._get_report_preferred_lots(line)
if lots and getattr(lots[0], 'lot_unit_line', None):
return lots[0].lot_unit_line
return getattr(line, 'unit', None)
def _get_report_total_unit(self):
virtual_units = []
for line in self._get_report_lines():
for lot in self._get_report_line_lots(line):
if (
getattr(lot, 'lot_type', None) == 'virtual'
and getattr(lot, 'lot_unit_line', None)):
virtual_units.append(lot.lot_unit_line)
if len(virtual_units) == 1:
return virtual_units[0]
line = self._get_report_first_line()
if line:
return self._get_report_line_unit(line)
return None
@staticmethod
def _get_report_unit_wording(unit):
label = ''
for attr in ('symbol', 'rec_name', 'name'):
value = getattr(unit, attr, None)
if value:
label = str(value).strip().upper()
break
mapping = {
'MT': ('METRIC TON', 'METRIC TONS'),
'METRIC TON': ('METRIC TON', 'METRIC TONS'),
'METRIC TONS': ('METRIC TON', 'METRIC TONS'),
'KG': ('KILOGRAM', 'KILOGRAMS'),
'KGS': ('KILOGRAM', 'KILOGRAMS'),
'KILOGRAM': ('KILOGRAM', 'KILOGRAMS'),
'KILOGRAMS': ('KILOGRAM', 'KILOGRAMS'),
'LB': ('POUND', 'POUNDS'),
'LBS': ('POUND', 'POUNDS'),
'POUND': ('POUND', 'POUNDS'),
'POUNDS': ('POUND', 'POUNDS'),
'BALE': ('BALE', 'BALES'),
'BALES': ('BALE', 'BALES'),
}
if label in mapping:
return mapping[label]
if label.endswith('S') and len(label) > 1:
return label[:-1], label
return label, label
@classmethod
def _report_quantity_to_words(cls, quantity, unit):
singular, plural = cls._get_report_unit_wording(unit)
return quantity_to_words(
quantity,
unit_singular=singular,
unit_plural=plural,
)
@staticmethod
def _convert_report_quantity(quantity, from_unit, to_unit):
value = Decimal(str(quantity or 0))
if not from_unit or not to_unit:
return value
if getattr(from_unit, 'id', None) == getattr(to_unit, 'id', None):
return value
from_name = getattr(from_unit, 'rec_name', None)
to_name = getattr(to_unit, 'rec_name', None)
if from_name and to_name and from_name == to_name:
return value
Uom = Pool().get('product.uom')
converted = Uom.compute_qty(from_unit, float(value), to_unit) or 0
return Decimal(str(converted))
def _get_report_total_weight(self, index):
lines = self._get_report_lines()
if not lines:
return None
total_unit = self._get_report_total_unit()
total = Decimal(0)
for line in lines:
quantity = self._get_report_line_weights(line)[index]
total += self._convert_report_quantity(
quantity,
self._get_report_line_unit(line),
total_unit,
)
return total
@property
def report_gross(self):
total = self._get_report_total_weight(1)
if total is not None:
return total
return ''
@property
def report_net(self):
total = self._get_report_total_weight(0)
if total is not None:
return total
return ''
@property
def report_total_quantity(self):
total = self._get_report_total_weight(0)
if total is not None:
return self._format_report_number(total, keep_trailing_decimal=True)
return '0.0'
@property
def report_quantity_unit_upper(self):
line = self._get_report_first_line()
unit = self._get_report_line_unit(line) if line else None
if unit and unit.rec_name:
return unit.rec_name.upper()
return ''
def _get_report_line_quantity(self, line):
return self._get_report_line_weights(line)[0]
@property
def report_qt(self):
total = self._get_report_total_weight(0)
if total is not None:
return self._report_quantity_to_words(
total, self._get_report_total_unit())
return ''
@property
def report_quantity_lines(self):
lines = self._get_report_lines()
if not lines:
return ''
details = []
for line in lines:
current_quantity = self._get_report_line_quantity(line)
quantity = self._format_report_number(
current_quantity, keep_trailing_decimal=True)
line_unit = self._get_report_line_unit(line)
unit = (
line_unit.rec_name.upper()
if line_unit and line_unit.rec_name else ''
)
words = self._report_quantity_to_words(current_quantity, line_unit)
period = line.del_period.description if getattr(line, 'del_period', None) else ''
detail = ' '.join(
part for part in [
quantity,
unit,
f"({words})",
f"- {period}" if period else '',
] if part)
if detail:
details.append(detail)
return '\n'.join(details)
@property
def report_nb_bale(self):
nb_bale = 0
lines = self._get_report_lines()
if lines:
for line in lines:
if line.lots:
nb_bale += sum([l.lot_qt for l in line.lots if l.lot_type == 'physic'])
if nb_bale:
return 'NB BALES: ' + str(int(nb_bale))
return ''
@property
def report_product_name(self):
line = self._get_report_first_line()
if line and line.product:
return line.product.name or ''
return ''
@property
def report_product_description(self):
line = self._get_report_first_line()
if line and line.product:
return line.product.description or ''
return ''
@property
def report_crop_name(self):
if self.crop:
return self.crop.name or ''
return ''
@property
def report_deal(self):
if self.lines and self.lines[0].lots and len(self.lines[0].lots)>1:
return self.lines[0].lots[1].line.purchase.number + ' ' + self.number
else:
''
@property
def report_packing(self):
nb_packing = 0
unit = ''
lines = self._get_report_lines()
if lines:
for line in lines:
if line.lots:
nb_packing += sum([l.lot_qt for l in line.lots if l.lot_type == 'physic'])
if len(line.lots)>1:
unit = line.lots[1].lot_unit.name
return str(int(nb_packing)) + unit
@property
def report_price(self):
line = self._get_report_first_line()
if line:
return self._format_report_price_words(line)
return ''
@property
def report_price_lines(self):
lines = self._get_report_lines()
if lines:
return '\n'.join(self._format_report_price_line(line) for line in lines)
return ''
@property
def report_trade_blocks(self):
lines = self._get_report_lines()
blocks = []
for line in lines:
current_quantity = self._get_report_line_quantity(line)
quantity = self._format_report_number(
current_quantity, keep_trailing_decimal=True)
line_unit = self._get_report_line_unit(line)
unit = (
line_unit.rec_name.upper()
if line_unit and line_unit.rec_name else ''
)
words = self._report_quantity_to_words(current_quantity, line_unit)
period = line.del_period.description if getattr(line, 'del_period', None) else ''
quantity_line = ' '.join(
part for part in [
quantity,
unit,
f"({words})",
f"- {period}" if period else '',
] if part)
price_line = self._format_report_price_line(line)
blocks.append((quantity_line, price_line))
return blocks
@property
def report_delivery(self):
del_date = 'PROMPT'
if self.lines:
if self.lines[0].estimated_date:
delivery_date = [dd.estimated_date for dd in self.lines[0].estimated_date if dd.trigger=='deldate']
if delivery_date:
del_date = delivery_date[0]
if del_date:
del_date = format_date_en(del_date)
return del_date
@property
def report_delivery_period_description(self):
line = self._get_report_first_line()
if line and line.del_period:
return line.del_period.description or ''
return ''
@property
def report_shipment_periods(self):
periods = []
for line in self._get_report_lines():
period = line.del_period.description if line.del_period else ''
if period and period not in periods:
periods.append(period)
if periods:
return '\n'.join(periods)
return ''
@property
def report_payment_date(self):
line = self._get_report_first_line()
if line:
if self.lc_date:
return format_date_en(self.lc_date)
Date = Pool().get('ir.date')
payment_date = line.sale.payment_term.lines[0].get_date(Date.today(), line)
if payment_date:
payment_date = format_date_en(payment_date)
return payment_date
def _get_report_bill_amount(self):
invoices = [
invoice for invoice in (self.invoices or [])
if getattr(invoice, 'state', None) != 'cancelled'
]
if invoices:
invoice = sorted(
invoices,
key=lambda i: (
getattr(i, 'invoice_date', None) or datetime.date.min,
getattr(i, 'id', 0)))[0]
return Decimal(str(getattr(invoice, 'total_amount', 0) or 0))
return Decimal(str(self.total_amount or 0))
@property
def report_bill_amount(self):
return self._get_report_bill_amount()
@property
def report_bill_amount_words(self):
value = self._get_report_bill_amount()
if self.currency and (self.currency.rec_name or '').upper() == 'USC':
return amount_to_currency_words(value, 'USC', 'USC')
return amount_to_currency_words(value)
@property
def report_bill_maturity_date(self):
maturity_dates = []
for invoice in (self.invoices or []):
if getattr(invoice, 'state', None) == 'cancelled':
continue
for line in (invoice.lines_to_pay or []):
if getattr(line, 'maturity_date', None):
maturity_dates.append(line.maturity_date)
if maturity_dates:
return min(maturity_dates)
if self.lc_date:
return self.lc_date
line = self._get_report_first_line()
if line and self.payment_term and self.payment_term.lines:
Date = Pool().get('ir.date')
return self.payment_term.lines[0].get_date(Date.today(), line)
@property
def report_shipment(self):
if self.lines:
if len(self.lines[0].lots)>1:
shipment = self.lines[0].lots[1].lot_shipment_in
lot = self.lines[0].lots[1].lot_name
if shipment:
info = ''
if shipment.bl_number:
info += ' B/L ' + shipment.bl_number
if shipment.supplier:
info += ' BY ' + shipment.supplier.name
if shipment.vessel:
info += ' (' + shipment.vessel.vessel_name + ')'
if shipment.container and shipment.container[0].container_no:
id = 1
for cont in shipment.container:
if id == 1:
info += ' Container(s)'
if cont.container_no:
info += ' ' + cont.container_no
else:
info += ' unnamed'
id += 1
# info += ' (LOT ' + lot + ')'
if shipment.note:
info += ' ' + shipment.note
return info
else:
return ''
@classmethod
def default_viewer(cls):
country_start = "Zobiland"
data = {
"highlightedCountryName": country_start
}
return "d3:" + json.dumps(data)
@fields.depends('doc_template','required_documents')
def on_change_with_required_documents(self):
if self.doc_template:
return self.doc_template.type
def get_viewer(self, name=None):
country_start = ''
dep_name = ''
arr_name = ''
departure = ''
arrival = ''
if self.party and self.party.addresses:
if self.party.addresses[0].country:
country_start = self.party.addresses[0].country.name
if self.from_location:
lat_from = self.from_location.lat
lon_from = self.from_location.lon
dep_name = self.from_location.name
departure = { "name":dep_name,"lat": str(lat_from), "lon": str(lon_from) }
if self.to_location:
lat_to = self.to_location.lat
lon_to = self.to_location.lon
arr_name = self.to_location.name
arrival = { "name":arr_name,"lat": str(lat_to), "lon": str(lon_to) }
data = {
"highlightedCountryNames": [{"name":country_start}],
"routePoints": [
{ "lon": -46.3, "lat": -23.9 },
{ "lon": -30.0, "lat": -20.0 },
{ "lon": -30.0, "lat": 0.0 },
{ "lon": -6.0, "lat": 35.9 },
{ "lon": 15.0, "lat": 38.0 },
{ "lon": 29.0, "lat": 41.0 }
],
"boats": [
# {"name": "CARIBBEAN 1",
# "imo": "1234567",
# "lon": -30.0,
# "lat": 0.0,
# "status": "En route",
# "links": [
# { "text": "Voir sur VesselFinder", "url": "https://www.vesselfinder.com" },
# { "text": "Détails techniques", "url": "https://example.com/tech" }
# ],
# "actions": [
# { "type": "track", "id": "123", "label": "Suivre ce bateau" },
# { "type": "details", "id": "123", "label": "Voir détails" }
# ]}
],
"cottonStocks": [
# { "name":"Mali","lat": 12.65, "lon": -8.0, "amount": 300 },
# { "name":"Egypte","lat": 30.05, "lon": 31.25, "amount": 500 },
# { "name":"Irak","lat": 33.0, "lon": 44.0, "amount": 150 }
],
"departures": [departure],
"arrivals": [arrival]
}
return "d3:" + json.dumps(data)
@fields.depends('party','from_location','to_location')
def on_change_with_viewer(self):
return self.get_viewer()
def getLots(self):
if self.lines:
if self.lines.lots:
return [l for l in self.lines.lots]
@classmethod
def validate(cls, sales):
super(Sale, cls).validate(sales)
Line = Pool().get('sale.line')
Date = Pool().get('ir.date')
for sale in sales:
for line in sale.lines:
if not line.quantity_theorical and line.quantity > 0:
line.quantity_theorical = Decimal(str(line.quantity)).quantize(Decimal("0.00001"))
Line.save([line])
if line.lots:
line_p = line.get_matched_lines()#line.lots[0].line
if line_p:
for l in line_p:
#compute pnl
Pnl = Pool().get('valuation.valuation')
Pnl.generate(l.lot_p.line)
# if line.quantity_theorical:
# OpenPosition = Pool().get('open.position')
# OpenPosition.create_from_sale_line(line)
if line.price_type == 'basis' and line.lots: #line.price_pricing and line.price_components and
previous_linked_price = line.linked_price
line.sync_linked_price_from_basis()
unit_price = line.get_basis_price()
if unit_price != line.unit_price or line.linked_price != previous_linked_price:
Line = Pool().get('sale.line')
line.unit_price = unit_price
Line.save([line])
if line.price_type == 'efp':
if line.derivatives:
for d in line.derivatives:
line.unit_price = d.price_index.get_price(Date.today(),line.unit,line.currency,True)
Line.save([line])
class PriceComposition(metaclass=PoolMeta):
__name__ = 'price.composition'
sale_line = fields.Many2One('sale.line',"Sale line")
class SaleLine(metaclass=PoolMeta):
__name__ = 'sale.line'
@classmethod
def default_pricing_rule(cls):
try:
Configuration = Pool().get('purchase_trade.configuration')
except KeyError:
return ''
configurations = Configuration.search([], limit=1)
if configurations:
return configurations[0].pricing_rule or ''
return ''
del_period = fields.Many2One('product.month',"Delivery Period")
lots = fields.One2Many('lot.lot','sale_line',"Lots",readonly=True)
fees = fields.One2Many('fee.fee', 'sale_line', 'Fees')
quantity_theorical = fields.Numeric("Th. quantity", digits='unit', readonly=False)
premium = fields.Numeric("Premium/Discount",digits='unit')
price_type = fields.Selection([
('cash', 'Cash Price'),
('priced', 'Priced'),
('basis', 'Basis'),
('efp', 'EFP'),
], 'Price type')
progress = fields.Function(fields.Float("Fix. progress",
states={
'invisible': Eval('price_type') != 'basis',
}),'get_progress')
from_del = fields.Date("From")
to_del = fields.Date("To")
period_at = fields.Selection([
(None, ''),
('laycan', 'Laycan'),
('loading', 'Loading'),
('discharge', 'Discharge'),
('crossing_border', 'Crossing Border'),
('title_transfer', 'Title transfer'),
('arrival', 'Arrival'),
],"Period at")
concentration = fields.Numeric("Concentration")
price_components = fields.One2Many('pricing.component','sale_line',"Components")
mtm = fields.Many2Many('sale.strategy', 'sale_line', 'strategy', 'Mtm Strategy')
derivatives = fields.One2Many('derivative.derivative','sale_line',"Derivatives")
price_pricing = fields.One2Many('pricing.pricing','sale_line',"Pricing")
price_summary = fields.One2Many('sale.pricing.summary','sale_line',"Summary")
estimated_date = fields.One2Many('pricing.estimated','sale_line',"Estimated date")
tol_min = fields.Numeric("Tol - in %",states={
'readonly': (Eval('inherit_tol')),
})
tol_max = fields.Numeric("Tol + in %",states={
'readonly': (Eval('inherit_tol')),
})
tol_min_qt = fields.Numeric("Tol -",states={
'readonly': (Eval('inherit_tol')),
})
tol_max_qt = fields.Numeric("Tol +",states={
'readonly': (Eval('inherit_tol')),
})
inherit_tol = fields.Boolean("Inherit tolerance")
tol_min_v = fields.Function(fields.Numeric("Qt min"),'get_tol_min')
tol_max_v = fields.Function(fields.Numeric("Qt max"),'get_tol_max')
certif = fields.Many2One('purchase.certification',"Certification",states={'readonly': (Eval('inherit_cer')),})
# certification = fields.Selection([
# (None, ''),
# ('bci', 'BCI'),
# ],"Certification",states={'readonly': (Eval('inherit_cer')),})
inherit_cer = fields.Boolean("Inherit certification")
enable_linked_currency = fields.Boolean("Linked currencies")
linked_price = fields.Numeric("Price", digits='unit',states={
'invisible': (~Eval('enable_linked_currency')),
'required': Eval('enable_linked_currency'),
'readonly': Eval('price_type') == 'basis',
}, depends=['enable_linked_currency', 'price_type'])
linked_currency = fields.Many2One('currency.linked',"Currency",states={
'invisible': (~Eval('enable_linked_currency')),
'required': Eval('enable_linked_currency'),
}, depends=['enable_linked_currency'])
linked_unit = fields.Many2One('product.uom', 'Unit',states={
'invisible': (~Eval('enable_linked_currency')),
'required': Eval('enable_linked_currency'),
}, depends=['enable_linked_currency'])
premium = fields.Numeric("Premium/Discount",digits='unit')
fee_ = fields.Many2One('fee.fee',"Fee")
attributes = fields.Dict(
'product.attribute', 'Attributes',
domain=[
('sets', '=', Eval('attribute_set')),
],
states={
'readonly': ~Eval('attribute_set'),
},
depends=['product', 'attribute_set'],
help="Add attributes to the variant."
)
attribute_set = fields.Function(
fields.Many2One('product.attribute.set', "Attribute Set"),
'on_change_with_attribute_set'
)
attributes_name = fields.Function(
fields.Char("Attributes Name"),
'on_change_with_attributes_name'
)
finished = fields.Boolean("Mark as finished")
pricing_rule = fields.Text("Pricing description")
price_composition = fields.One2Many('price.composition','sale_line',"Price composition")
@classmethod
def default_finished(cls):
return False
@property
def report_fixing_rule(self):
pricing_rule = ''
if self.pricing_rule:
pricing_rule = self.pricing_rule
return pricing_rule
@property
def get_pricing_text(self):
parts = []
if self.price_components:
for pc in self.price_components:
if pc.price_index:
price_desc = pc.price_index.price_desc or ''
period_desc = (
pc.price_index.price_period.description
if pc.price_index.price_period else '') or ''
part = ' '.join(
piece for piece in ['ON', price_desc, period_desc]
if piece)
if part:
parts.append(part)
return ' '.join(parts)
@fields.depends('product')
def on_change_with_attribute_set(self, name=None):
if self.product and self.product.template and self.product.template.attribute_set:
return self.product.template.attribute_set.id
@fields.depends('product', 'attributes')
def on_change_with_attributes_name(self, name=None):
if not self.product or not self.product.attribute_set or not self.attributes:
return
def key(attribute):
return getattr(attribute, 'sequence', attribute.name)
values = []
for attribute in sorted(self.product.attribute_set.attributes, key=key):
if attribute.name in self.attributes:
value = self.attributes[attribute.name]
values.append(gettext(
'product_attribute.msg_label_value',
label=attribute.string,
value=attribute.format(value)
))
return " | ".join(filter(None, values))
@classmethod
def default_price_type(cls):
return 'priced'
@classmethod
def default_inherit_tol(cls):
return True
@classmethod
def default_enable_linked_currency(cls):
return False
@classmethod
def default_inherit_cer(cls):
return True
def get_matched_lines(self):
if self.lots:
LotQt = Pool().get('lot.qt')
return LotQt.search([('lot_s','=',self.lots[0].id),('lot_p','>',0)])
def get_date(self,trigger_event):
trigger_date = None
if self.estimated_date:
logger.info("ESTIMATED_DATE:%s",self.estimated_date)
trigger_date = [d.estimated_date for d in self.estimated_date if d.trigger == trigger_event]
logger.info("TRIGGER_DATE:%s",trigger_date)
logger.info("TRIGGER_EVENT:%s",trigger_event)
trigger_date = trigger_date[0] if trigger_date else None
return trigger_date
def get_tol_min(self,name):
if self.inherit_tol:
if self.sale.tol_min and self.quantity_theorical:
return round((1-(self.sale.tol_min/100))*Decimal(self.quantity_theorical),3)
else:
if self.tol_min and self.quantity_theorical:
return round((1-(self.tol_min/100))*Decimal(self.quantity_theorical),3)
def get_tol_max(self,name):
if self.inherit_tol:
if self.sale.tol_max and self.quantity_theorical:
return round((1+(self.sale.tol_max/100))*Decimal(self.quantity_theorical),3)
else:
if self.tol_max and self.quantity_theorical:
return round((1+(self.tol_max/100))*Decimal(self.quantity_theorical),3)
def get_progress(self,name):
PS = Pool().get('sale.pricing.summary')
ps = PS.search(['sale_line','=',self.id])
if ps:
return sum((e.progress if e.progress else 0) * (e.ratio if e.ratio else 0) / 100 for e in ps)
def getVirtualLot(self):
if self.lots:
return [l for l in self.lots if l.lot_type=='virtual'][0]
def _get_linked_unit_factor(self):
if not (self.enable_linked_currency and self.linked_currency):
return None
factor = Decimal(self.linked_currency.factor or 0)
if not factor:
return None
unit_factor = Decimal(1)
if self.linked_unit:
source_unit = getattr(self, 'unit', None)
if not source_unit and self.product:
source_unit = self.product.sale_uom
if not source_unit:
return factor
Uom = Pool().get('product.uom')
unit_factor = Decimal(str(
Uom.compute_qty(source_unit, float(1), self.linked_unit) or 0))
return factor * unit_factor
def _linked_to_line_price(self, price):
factor = self._get_linked_unit_factor()
price = Decimal(price or 0)
if not factor:
return price
return round(price * factor, 4)
def _line_to_linked_price(self, price):
factor = self._get_linked_unit_factor()
price = Decimal(price or 0)
if not factor:
return price
return round(price / factor, 4)
def _get_premium_price(self):
premium = Decimal(self.premium or 0)
if not premium:
return Decimal(0)
if self.enable_linked_currency and self.linked_currency:
return self._linked_to_line_price(premium)
return premium
def get_price(self,lot_premium=0):
return round(
Decimal(self.unit_price or 0)
+ Decimal(lot_premium or 0),
4)
def _get_basis_component_price(self):
price = Decimal(0)
for pc in self.price_components:
PP = Pool().get('sale.pricing.summary')
pp = PP.search([('price_component','=',pc.id),('sale_line','=',self.id)])
if pp:
price += pp[0].price * (pc.ratio / 100)
return round(price,4)
def get_basis_price(self):
return round(self._get_basis_component_price(), 4)
def sync_linked_price_from_basis(self):
if self.enable_linked_currency and self.linked_currency:
self.linked_price = self._line_to_linked_price(
self._get_basis_component_price())
def get_price_linked_currency(self,lot_premium=0):
return round(
self._linked_to_line_price(
Decimal(self.linked_price or 0)
+ Decimal(lot_premium or 0)),
4)
@fields.depends('id','unit','quantity','unit_price','price_pricing','price_type','price_components','estimated_date','lots','fees','enable_linked_currency','linked_price','linked_currency','linked_unit')
def on_change_with_unit_price(self, name=None):
Date = Pool().get('ir.date')
logger.info("ONCHANGEUNITPRICE:%s",self.unit_price)
if self.price_type == 'basis':
self.sync_linked_price_from_basis()
logger.info("ONCHANGEUNITPRICE_IN:%s",self.get_basis_price())
return self.get_basis_price()
if self.enable_linked_currency and self.linked_price and self.linked_currency and self.price_type == 'priced':
return self.get_price_linked_currency()
if self.price_type == 'efp':
if hasattr(self, 'derivatives') and self.derivatives:
for d in self.derivatives:
return d.price_index.get_price(Date.today(),self.unit,self.sale.currency,True)
return self.get_price()
@fields.depends(
'type', 'quantity', 'unit_price', 'unit', 'product',
'sale', '_parent_sale.currency',
'premium', 'enable_linked_currency', 'linked_currency', 'linked_unit')
def on_change_with_amount(self):
if self.type == 'line':
currency = self.sale.currency if self.sale else None
amount = Decimal(str(self.quantity or 0)) * (
Decimal(self.unit_price or 0) + self._get_premium_price())
if currency:
return currency.round(amount)
return amount
return Decimal(0)
@fields.depends(
'unit', 'product', 'price_type', 'enable_linked_currency',
'linked_currency', 'linked_unit', 'linked_price', 'premium',
methods=['on_change_with_unit_price', 'on_change_with_amount'])
def _recompute_trade_price_fields(self):
self.unit_price = self.on_change_with_unit_price()
self.amount = self.on_change_with_amount()
@fields.depends(methods=['_recompute_trade_price_fields'])
def on_change_premium(self):
self._recompute_trade_price_fields()
@fields.depends(methods=['_recompute_trade_price_fields'])
def on_change_price_type(self):
self._recompute_trade_price_fields()
@fields.depends(methods=['_recompute_trade_price_fields'])
def on_change_enable_linked_currency(self):
self._recompute_trade_price_fields()
@fields.depends(methods=['_recompute_trade_price_fields'])
def on_change_linked_price(self):
self._recompute_trade_price_fields()
@fields.depends(methods=['_recompute_trade_price_fields'])
def on_change_linked_currency(self):
self._recompute_trade_price_fields()
@fields.depends(methods=['_recompute_trade_price_fields'])
def on_change_linked_unit(self):
self._recompute_trade_price_fields()
def check_from_to(self,tr):
if tr.pricing_period:
date_from,date_to, d, include, dates = tr.getDateWithEstTrigger(1)
if date_from:
tr.from_p = date_from.date()
if date_to:
tr.to_p = date_to.date()
if tr.application_period:
date_from,date_to, d, include, dates = tr.getDateWithEstTrigger(2)
if date_from:
tr.from_a = date_from.date()
if date_to:
tr.to_a = date_to.date()
TR = Pool().get('pricing.trigger')
TR.save([tr])
def check_pricing(self):
if self.price_components:
for pc in self.price_components:
if not pc.auto:
Pricing = Pool().get('pricing.pricing')
pricings = Pricing.search(['price_component','=',pc.id],order=[('pricing_date', 'ASC')])
if pricings:
cumul_qt = Decimal(0)
base_quantity = self._get_pricing_base_quantity()
index = 0
for pr in pricings:
cumul_qt += pr.quantity
pr.fixed_qt = cumul_qt
pr.fixed_qt_price = pr.get_fixed_price()
pr.unfixed_qt = base_quantity - pr.fixed_qt
pr.unfixed_qt_price = pr.fixed_qt_price
pr.eod_price = pr.get_eod_price_sale()
if index == len(pricings) - 1:
pr.last = True
index += 1
Pricing.save([pr])
if pc.triggers and pc.auto:
prDate = []
prPrice = []
apDate = []
apPrice = []
for t in pc.triggers:
prD, prP = t.getPricingListDates(pc.calendar)
apD, apP = t.getApplicationListDates(pc.calendar)
prDate.extend(prD)
prPrice.extend(prP)
apDate.extend(apD)
apPrice.extend(apP)
if pc.quota_sale:
prPrice = self.get_avg(prPrice)
self.generate_pricing(pc,apDate,prPrice)
def get_avg(self,lprice):
l = len(lprice)
if l > 0 :
cumulprice = float(0)
i = 1
for p in lprice:
if i > 1:
p['avg_minus_1'] = cumulprice / (i-1)
cumulprice += p['price']
p['avg'] = cumulprice / i
i += 1
return lprice
def getnearprice(self,pl,d,t,max_date=None):
if pl:
pl_sorted = sorted(pl, key=lambda x: x['date'])
pminus = pl_sorted[0]
if not max_date:
max_date = d.date()
for p in pl_sorted:
if p['date'].date() == d.date():
if p['isAvg'] and t == 'avg':
return p[t]
if not p['isAvg'] and t == 'avg':
return p['price']
elif p['date'].date() > d.date():
if pminus != p:
return pminus[t]
else:
return Decimal(0)
pminus = p
return pl_sorted[len(pl)-1][t]
return Decimal(0)
def _get_pricing_base_quantity(self):
quantity = self.quantity_theorical
if quantity is None:
quantity = self.quantity
return Decimal(str(quantity or 0))
def generate_pricing(self,pc,dl,pl):
Pricing = Pool().get('pricing.pricing')
pricing = Pricing.search(['price_component','=',pc.id])
if pricing:
Pricing.delete(pricing)
base_quantity = self._get_pricing_base_quantity()
cumul_qt = 0
index = 0
dl_sorted = sorted(dl)
for d in dl_sorted:
if pc.pricing_date and d.date() > pc.pricing_date:
break
p = Pricing()
p.sale_line = self.id
logger.info("GENEDATE:%s",d)
logger.info("TYPEDATE:%s",type(d))
p.pricing_date = d.date()
p.price_component = pc.id
p.quantity = round(Decimal(pc.quota_sale),4)
price = round(Decimal(self.getnearprice(pl,d,'price',pc.pricing_date)),4)
p.settl_price = price
if price > 0:
cumul_qt += pc.quota_sale
p.fixed_qt = round(Decimal(cumul_qt),4)
p.fixed_qt_price = round(Decimal(self.getnearprice(pl,d,'avg',pc.pricing_date)),4)
#p.fixed_qt_price = p.get_fixed_price()
if p.fixed_qt_price == 0:
p.fixed_qt_price = round(Decimal(self.getnearprice(pl,d,'avg_minus_1',pc.pricing_date)),4)
p.unfixed_qt = round(base_quantity - Decimal(cumul_qt),4)
if p.unfixed_qt < 0.001:
p.unfixed_qt = Decimal(0)
p.fixed_qt = base_quantity
if price > 0:
logger.info("GENERATE_1:%s",price)
p.unfixed_qt_price = price
else:
pr = Decimal(pc.price_index.get_price(p.pricing_date,self.unit,self.sale.currency,True))
pr = round(pr,4)
logger.info("GENERATE_2:%s",pr)
p.unfixed_qt_price = pr
p.eod_price = p.get_eod_price_sale()
if (index == len(dl)-1) or (pc.pricing_date and (index < len(dl)-1 and dl_sorted[index+1].date() > pc.pricing_date)):
p.last = True
logger.info("GENERATE_3:%s",p.unfixed_qt_price)
Pricing.save([p])
index += 1
@classmethod
def write(cls, *args):
Lot = Pool().get('lot.lot')
LotQt = Pool().get('lot.qt')
old_values = {}
for records, values in zip(args[::2], args[1::2]):
if 'quantity_theorical' in values:
for record in records:
old_values[record.id] = record.quantity_theorical
super().write(*args)
lines = sum(args[::2], [])
for line in lines:
if line.id not in old_values:
continue
old = Decimal(old_values[line.id] or 0)
new = Decimal(line.quantity_theorical or 0)
delta = new - old
if delta == 0:
continue
virtual_lots = [
lot for lot in (line.lots or [])
if lot.lot_type == 'virtual'
]
if not virtual_lots:
continue
vlot = virtual_lots[0]
lqts = LotQt.search([
('lot_s', '=', vlot.id),
('lot_p', '=', None),
('lot_shipment_in', '=', None),
('lot_shipment_internal', '=', None),
('lot_shipment_out', '=', None),
])
if delta > 0:
new_qty = round(
Decimal(vlot.get_current_quantity_converted() or 0) + delta,
5)
vlot.set_current_quantity(new_qty, new_qty, 1)
Lot.save([vlot])
if lqts:
lqt = lqts[0]
lqt.lot_quantity = round(
Decimal(lqt.lot_quantity or 0) + delta, 5)
LotQt.save([lqt])
else:
lqt = LotQt()
lqt.lot_p = None
lqt.lot_s = vlot.id
lqt.lot_quantity = round(delta, 5)
lqt.lot_unit = line.unit
LotQt.save([lqt])
elif delta < 0:
decrease = abs(delta)
if not lqts or Decimal(lqts[0].lot_quantity or 0) < decrease:
raise UserError("Please unlink or unmatch lot")
new_qty = round(
Decimal(vlot.get_current_quantity_converted() or 0)
- decrease,
5)
vlot.set_current_quantity(new_qty, new_qty, 1)
Lot.save([vlot])
lqt = lqts[0]
lqt.lot_quantity = round(
Decimal(lqt.lot_quantity or 0) - decrease, 5)
LotQt.save([lqt])
@classmethod
def delete(cls, lines):
pool = Pool()
LotQt = pool.get('lot.qt')
Valuation = pool.get('valuation.valuation')
OpenPosition = pool.get('open.position')
for line in lines:
if line.lots:
vlot_s = line.lots[0].getVlot_s()
lqts = LotQt.search([('lot_s','=',vlot_s.id),('lot_p','!=',None),('lot_quantity','>',0)])
if lqts:
raise UserError("You cannot delete matched sale")
return
lqts = LotQt.search([('lot_s','=',vlot_s.id)])
LotQt.delete(lqts)
valuations = Valuation.search([('lot','in',line.lots)])
if valuations:
Valuation.delete(valuations)
# op = OpenPosition.search(['sale_line','=',line.id])
# if op:
# OpenPosition.delete(op)
super(SaleLine, cls).delete(lines)
@classmethod
def copy(cls, lines, default=None):
if default is None:
default = {}
else:
default = default.copy()
default.setdefault('lots', None)
default.setdefault('quantity', Decimal(0))
default.setdefault('quantity_theorical', None)
default.setdefault('price_pricing', None)
return super().copy(lines, default=default)
@classmethod
def validate(cls, salelines):
LotQtHist = Pool().get('lot.qt.hist')
LotQtType = Pool().get('lot.qt.type')
Pnl = Pool().get('valuation.valuation')
super(SaleLine, cls).validate(salelines)
for line in salelines:
if line.price_components:
for pc in line.price_components:
if pc.triggers:
for tr in pc.triggers:
line.check_from_to(tr)
line.check_pricing()
#no lot need to create one with line quantity
logger.info("FROM_VALIDATE_LINE:%s",line.created_by_code)
if not line.created_by_code:
if not line.lots and line.product.type != 'service' and line.quantity != Decimal(0):
Lot = Pool().get('lot.lot')
lot = Lot()
lot.sale_line = line.id
lot.lot_qt = line.quantity
lot.lot_unit_line = line.unit
lot.lot_quantity = Decimal(str(line.quantity)).quantize(Decimal("0.00001"))#round(line.quantity,5)
lot.lot_status = 'forecast'
lot.lot_type = 'virtual'
lot.lot_product = line.product
lqtt = LotQtType.search([('sequence','=',1)])
if lqtt:
lqh = LotQtHist()
lqh.quantity_type = lqtt[0]
lqh.quantity = lot.lot_quantity
lqh.gross_quantity = lot.lot_quantity
lot.lot_hist = [lqh]
if line.quantity > 0:
Lot.save([lot])
#check if fees need to be updated
if line.fees:
for fee in line.fees:
fl_check = FeeLots.search([('fee','=',fee.id),('lot','=',lot.id),('sale_line','=',line.id)])
if not fl_check:
fl = FeeLots()
fl.fee = fee.id
fl.lot = lot.id
fl.sale_line = line.id
FeeLots.save([fl])
#generate valuation for purchase and sale
LotQt = Pool().get('lot.qt')
line = cls(line.id)
generated_purchase_side = False
if line.lots:
for lot in line.lots:
lqts = LotQt.search([('lot_s','=',lot.id),('lot_p','>',0)])
logger.info("VALIDATE_SL:%s",lqts)
if lqts:
generated_purchase_side = True
purchase_lines = [e.lot_p.line for e in lqts]
if purchase_lines:
for pl in purchase_lines:
Pnl.generate(pl)
if line.lots and not generated_purchase_side:
Pnl.generate_from_sale_line(line)
class SaleCreatePurchase(Wizard):
"Create mirror purchase"
__name__ = "sale.create.mirror"
start = StateTransition()
purchase = StateView(
'sale.create.input',
'purchase_trade.create_purchase_view_form', [
Button("Cancel", 'end', 'tryton-cancel'),
Button("Create", 'creating', 'tryton-ok', default=True),
])
creating = StateTransition()
def transition_start(self):
return 'purchase'
def transition_creating(self):
Purchase = Pool().get('purchase.purchase')
PL = Pool().get('purchase.line')
LotQt = Pool().get('lot.qt')
p = None
pl = None
for r in self.records:
if r.lines:
p = Purchase()
p.party = self.purchase.party
p.incoterm = self.purchase.incoterm
p.payment_term = self.purchase.payment_term
p.from_location = self.purchase.from_location
p.to_location = self.purchase.to_location
Purchase.save([p])
pl = PL()
pl.quantity = r.lines[0].quantity
pl.unit = r.lines[0].unit
pl.product = r.lines[0].product
pl.unit_price = self.purchase.unit_price
pl.currency = self.purchase.currency
pl.purchase = p.id
PL.save([pl])
#Match if requested
if self.purchase.match:
#Increase forecasted virtual part matched
if pl:
if pl.lots:
qt = Decimal(pl.quantity)
vlot_p = pl.getVirtualLot()
vlot_s = self.records[0].lines[0].getVirtualLot()
if not vlot_p.updateVirtualPart(None,qt,vlot_p,None,vlot_s):
vlot_p.createVirtualPart(qt,pl.unit,vlot_p,None,vlot_s)
#Decrease forecasted virtual part non matched
lqts = LotQt.search([('lot_p','=',vlot_p)])
if lqts:
vlot_p.updateVirtualPart(lqts[0],-qt)
lqts = LotQt.search([('lot_s','=',vlot_s)])
if lqts:
vlot_p.updateVirtualPart(lqts[0],-qt)
return 'end'
def end(self):
return 'reload'
class SaleCreatePurchaseInput(ModelView):
"Create purchase mirror"
__name__ = "sale.create.input"
party = fields.Many2One('party.party',"Supplier")
incoterm = fields.Many2One('incoterm.incoterm',"Incoterm",domain=[('location', '=', False)])
payment_term = fields.Many2One('account.invoice.payment_term', "Payment Term")
from_location = fields.Many2One('stock.location',"From location")
to_location = fields.Many2One('stock.location',"To location")
unit_price = fields.Numeric("Price")
currency = fields.Many2One('currency.currency',"Currency")
match = fields.Boolean("Match open quantity")
class Derivative(metaclass=PoolMeta):
"Derivative"
__name__ = 'derivative.derivative'
sale = fields.Many2One('sale.sale',"Sale")
sale_line = fields.Many2One('sale.line',"Line")
class Valuation(metaclass=PoolMeta):
"Valuation"
__name__ = 'valuation.valuation'
sale = fields.Many2One('sale.sale',"Sale")
sale_line = fields.Many2One('sale.line',"Line")
class ValuationLine(metaclass=PoolMeta):
"Last Valuation"
__name__ = 'valuation.valuation.line'
sale = fields.Many2One('sale.sale',"Sale")
sale_line = fields.Many2One('sale.line',"Line")
class ValuationReport(metaclass=PoolMeta):
"Valuation Report"
__name__ = 'valuation.report'
sale = fields.Many2One('sale.sale',"Sale")
sale_line = fields.Many2One('sale.line',"Line")
class ValuationDyn(metaclass=PoolMeta):
"Valuation"
__name__ = 'valuation.valuation.dyn'
r_sale = fields.Many2One('sale.sale',"Sale")
r_sale_line = fields.Many2One('sale.line',"Line")
@classmethod
def table_query(cls):
Valuation = Pool().get('valuation.valuation')
val = Valuation.__table__()
context = Transaction().context
group_pnl = context.get('group_pnl')
wh = (val.id > 0)
query = val.select(
Literal(0).as_('create_uid'),
CurrentTimestamp().as_('create_date'),
Literal(None).as_('write_uid'),
Literal(None).as_('write_date'),
Max(val.id).as_('id'),
Max(val.purchase).as_('r_purchase'),
Max(val.sale).as_('r_sale'),
Max(val.line).as_('r_line'),
Max(val.date).as_('r_date'),
Literal(None).as_('r_type'),
Max(val.reference).as_('r_reference'),
Literal(None).as_('r_counterparty'),
Max(val.product).as_('r_product'),
Literal(None).as_('r_state'),
Avg(val.price).as_('r_price'),
Max(val.currency).as_('r_currency'),
Sum(val.quantity).as_('r_quantity'),
Max(val.unit).as_('r_unit'),
Sum(val.amount).as_('r_amount'),
Sum(val.base_amount).as_('r_base_amount'),
Sum(val.rate).as_('r_rate'),
Avg(val.mtm_price).as_('r_mtm_price'),
Sum(val.mtm).as_('r_mtm'),
Max(val.strategy).as_('r_strategy'),
Max(val.lot).as_('r_lot'),
Max(val.sale_line).as_('r_sale_line'),
where=wh,
group_by=[val.purchase,val.sale])
return query
class Fee(metaclass=PoolMeta):
"Fee"
__name__ = 'fee.fee'
sale_line = fields.Many2One('sale.line',"Line")
class SaleAllocationsWizard(Wizard):
'Open Allocations report from Sale without modal'
__name__ = 'sale.allocations.wizard'
start_state = 'open_report'
open_report = StateAction('purchase_trade.act_lot_report_form')
def do_open_report(self, action):
sale_id = Transaction().context.get('active_id')
if not sale_id:
raise ValueError("No active sale ID in context")
action['context_model'] = 'lot.context'
action['pyson_context'] = PYSONEncoder().encode({
'sale': sale_id,
})
return action, {}