Files
tradon/modules/purchase_trade/pricing.py
2026-04-10 07:52:59 +02:00

814 lines
29 KiB
Python
Executable File

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from trytond.model import fields
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Bool, Eval, Id
from trytond.model import (ModelSQL, ModelView)
from trytond.tools import is_full_text, lstrip_wildcard
from trytond.transaction import Transaction, inactive_records
from decimal import getcontext, Decimal, ROUND_HALF_UP
from sql.aggregate import Count, Max, Min, Sum, Avg, BoolOr
from sql.conditionals import Case
from sql import Column, Literal
from sql.functions import CurrentTimestamp, DateTrunc
from trytond.wizard import Button, StateTransition, StateView, Wizard
from itertools import chain, groupby
from operator import itemgetter
import datetime
import logging
from trytond.modules.purchase_trade.purchase import (TRIGGERS)
logger = logging.getLogger(__name__)
DAYTYPES = [
(None,''),
('before', 'Nb days before'),
('after', 'Nb days after'),
('first', 'First day'),
('last', 'Last day'),
('xth', 'Nth day'),
]
WEEKDAY_MAP = {
'monday': 0,
'tuesday': 1,
'wednesday': 2,
'thursday': 3,
'friday': 4,
'saturday': 5,
'sunday': 6
}
DAYS = [
(None,''),
('monday', 'Monday'),
('tuesday', 'Tuesday'),
('wednesday', 'Wednesday'),
('thursday', 'Thursday'),
('friday', 'Friday'),
('saturday', 'Saturday'),
('sunday', 'Sunday'),
]
class Estimated(ModelSQL, ModelView):
"Estimated date"
__name__ = 'pricing.estimated'
trigger = fields.Selection(TRIGGERS,"Trigger")
estimated_date = fields.Date("Estimated date")
fin_int_delta = fields.Integer("Financing interests delta")
class MtmScenario(ModelSQL, ModelView):
"MtM Scenario"
__name__ = 'mtm.scenario'
name = fields.Char("Scenario", required=True)
valuation_date = fields.Date("Valuation Date", required=True)
use_last_price = fields.Boolean("Use Last Available Price")
calendar = fields.Many2One(
'price.calendar', "Calendar"
)
class MtmStrategy(ModelSQL, ModelView):
"Mark to Market Strategy"
__name__ = 'mtm.strategy'
name = fields.Char("Name", required=True)
active = fields.Boolean("Active")
scenario = fields.Many2One(
'mtm.scenario', "Scenario", required=True
)
currency = fields.Many2One(
'currency.currency', "Valuation Currency"
)
components = fields.One2Many(
'pricing.component', 'strategy', "Components"
)
@classmethod
def default_active(cls):
return True
def get_mtm(self,line,qty):
pool = Pool()
Currency = pool.get('currency.currency')
total = Decimal(0)
scenario = self.scenario
dt = scenario.valuation_date
for comp in self.components:
value = Decimal(0)
if comp.price_source_type == 'curve' and comp.price_index:
value = Decimal(
comp.price_index.get_price(
dt,
line.unit,
self.currency,
last=scenario.use_last_price
)
)
elif comp.price_source_type == 'matrix' and comp.price_matrix:
value = self._get_matrix_price(comp, line, dt)
if comp.ratio:
value *= Decimal(comp.ratio) / Decimal(100)
total += value * qty
return Decimal(str(total)).quantize(Decimal("0.01"))
def _get_matrix_price(self, comp, line, dt):
MatrixLine = Pool().get('price.matrix.line')
domain = [
('matrix', '=', comp.price_matrix.id),
]
if line:
domain += [
('origin', '=', line.purchase.from_location),
('destination', '=', line.purchase.to_location),
]
lines = MatrixLine.search(domain)
if lines:
return Decimal(lines[0].price_value)
return Decimal(0)
def run_daily_mtm():
Strategy = Pool().get('mtm.strategy')
Snapshot = Pool().get('mtm.snapshot')
for strat in Strategy.search([('active', '=', True)]):
amount = strat.compute_mtm()
Snapshot.create([{
'strategy': strat.id,
'valuation_date': strat.scenario.valuation_date,
'amount': amount,
'currency': strat.currency.id,
}])
class Mtm(ModelSQL, ModelView):
"MtM Component"
__name__ = 'mtm.component'
strategy = fields.Many2One(
'mtm.strategy', "Strategy",
required=True, ondelete='CASCADE'
)
name = fields.Char("Component", required=True)
component_type = fields.Selection([
('commodity', 'Commodity'),
('freight', 'Freight'),
('quality', 'Quality'),
('fx', 'FX'),
('storage', 'Storage'),
('other', 'Other'),
], "Type", required=True)
fix_type = fields.Many2One('price.fixtype', "Fixation Type")
price_source_type = fields.Selection([
('curve', 'Curve'),
('matrix', 'Matrix'),
('manual', 'Manual'),
], "Price Source", required=True)
price_index = fields.Many2One('price.price', "Price Curve")
price_matrix = fields.Many2One('price.matrix', "Price Matrix")
ratio = fields.Numeric("Ratio / %", digits=(16, 6))
manual_price = fields.Numeric(
"Manual Price",
digits=(16, 6),
help="Price set manually if price_source_type is 'manual'"
)
currency = fields.Many2One('currency.currency', "Currency")
def get_cur(self, name=None):
if self.price_index:
return self.price_index.price_currency
if self.price_matrix:
return self.price_matrix.currency
return None
@fields.depends('price_index','price_matrix')
def on_change_with_currency(self):
return self.get_cur()
class PriceMatrix(ModelSQL, ModelView):
"Price Matrix"
__name__ = 'price.matrix'
name = fields.Char("Name", required=True)
matrix_type = fields.Selection([
('freight', 'Freight'),
('location', 'Location Spread'),
('quality', 'Quality'),
('storage', 'Storage'),
('other', 'Other'),
], "Matrix Type", required=True)
unit = fields.Many2One('product.uom', "Unit")
currency = fields.Many2One('currency.currency', "Currency")
calendar = fields.Many2One(
'price.calendar', "Calendar"
)
valid_from = fields.Date("Valid From")
valid_to = fields.Date("Valid To")
lines = fields.One2Many(
'price.matrix.line', 'matrix', "Lines"
)
class PriceMatrixLine(ModelSQL, ModelView):
"Price Matrix Line"
__name__ = 'price.matrix.line'
matrix = fields.Many2One(
'price.matrix', "Matrix",
required=True, ondelete='CASCADE'
)
origin = fields.Many2One('stock.location', "Origin")
destination = fields.Many2One('stock.location', "Destination")
product = fields.Many2One('product.product', "Product")
quality = fields.Many2One('product.category', "Quality")
price_value = fields.Numeric("Price", digits=(16, 6))
class MtmSnapshot(ModelSQL, ModelView):
"MtM Snapshot"
__name__ = 'mtm.snapshot'
strategy = fields.Many2One(
'mtm.strategy', "Strategy",
required=True, ondelete='CASCADE'
)
valuation_date = fields.Date("Valuation Date", required=True)
amount = fields.Numeric("MtM Amount", digits=(16, 6))
currency = fields.Many2One('currency.currency', "Currency")
created_at = fields.DateTime("Created At")
class Component(ModelSQL, ModelView):
"Component"
__name__ = 'pricing.component'
strategy = fields.Many2One(
'mtm.strategy', "Strategy",
required=False, ondelete='CASCADE'
)
price_source_type = fields.Selection([
('curve', 'Curve'),
('matrix', 'Matrix'),
# ('manual', 'Manual'),
], "Price Source", required=True)
fix_type = fields.Many2One('price.fixtype',"Fixation type")
ratio = fields.Numeric("%",digits=(16,7))
price_index = fields.Many2One('price.price',"Curve")
price_matrix = fields.Many2One('price.matrix', "Price Matrix")
currency = fields.Function(fields.Many2One('currency.currency',"Curr."),'get_cur')
auto = fields.Boolean("Auto")
fallback = fields.Boolean("Fallback")
calendar = fields.Many2One('price.calendar',"Calendar")
nbdays = fields.Function(fields.Integer("Nb days"),'get_nbdays')
triggers = fields.One2Many('pricing.trigger','component',"Period rules")
pricing_date = fields.Date("Pricing date max")
def get_rec_name(self, name=None):
if self.price_index:
return '[' + self.fix_type.name + '] ' + self.price_index.price_index
else:
return '[' + self.fix_type.name + '] '
def get_cur(self,name):
if self.price_index:
PI = Pool().get('price.price')
pi = PI(self.price_index)
return pi.price_currency
def get_nbdays(self, name):
days = 0
if self.triggers:
for t in self.triggers:
l,l2 = t.getApplicationListDates(self.calendar)
days += len(l)
return days
@classmethod
def delete(cls, components):
for cp in components:
Pricing = Pool().get('pricing.pricing')
pricings = Pricing.search(['price_component','=',cp.id])
if pricings:
Pricing.delete(pricings)
super(Component, cls).delete(components)
class Pricing(ModelSQL,ModelView):
"Pricing"
__name__ = 'pricing.pricing'
pricing_date = fields.Date("Date")
price_component = fields.Many2One('pricing.component', "Component")#, domain=[('id', 'in', Eval('line.price_components'))], ondelete='CASCADE')
quantity = fields.Numeric("Qt",digits='unit')
settl_price = fields.Numeric("Settl. price",digits='unit')
fixed_qt = fields.Numeric("Fixed qt",digits='unit', readonly=True)
fixed_qt_price = fields.Numeric("Fixed qt price",digits='unit', readonly=True)
unfixed_qt = fields.Numeric("Unfixed qt",digits='unit', readonly=True)
unfixed_qt_price = fields.Numeric("Unfixed qt price",digits='unit', readonly=True)
eod_price = fields.Numeric("EOD price",digits='unit',readonly=True)
last = fields.Boolean("Last")
@classmethod
def default_fixed_qt(cls):
return Decimal(0)
@classmethod
def default_unfixed_qt(cls):
return Decimal(0)
@classmethod
def default_fixed_qt_price(cls):
return Decimal(0)
@classmethod
def default_unfixed_qt_price(cls):
return Decimal(0)
@classmethod
def default_quantity(cls):
return Decimal(0)
@classmethod
def default_settl_price(cls):
return Decimal(0)
@classmethod
def default_eod_price(cls):
return Decimal(0)
@staticmethod
def _weighted_average_price(fixed_qt, fixed_price, unfixed_qt, unfixed_price):
fixed_qt = Decimal(str(fixed_qt or 0))
fixed_price = Decimal(str(fixed_price or 0))
unfixed_qt = Decimal(str(unfixed_qt or 0))
unfixed_price = Decimal(str(unfixed_price or 0))
total_qty = fixed_qt + unfixed_qt
if total_qty == 0:
return Decimal(0)
return round(
((fixed_qt * fixed_price) + (unfixed_qt * unfixed_price)) / total_qty,
4,
)
def compute_eod_price(self):
if getattr(self, 'sale_line', None) and hasattr(self, 'get_eod_price_sale'):
return self.get_eod_price_sale()
if getattr(self, 'line', None) and hasattr(self, 'get_eod_price_purchase'):
return self.get_eod_price_purchase()
return self._weighted_average_price(
self.fixed_qt,
self.fixed_qt_price,
self.unfixed_qt,
self.unfixed_qt_price,
)
@fields.depends('fixed_qt', 'fixed_qt_price', 'unfixed_qt', 'unfixed_qt_price')
def on_change_fixed_qt(self):
self.eod_price = self.compute_eod_price()
@fields.depends('fixed_qt', 'fixed_qt_price', 'unfixed_qt', 'unfixed_qt_price')
def on_change_fixed_qt_price(self):
self.eod_price = self.compute_eod_price()
@fields.depends('fixed_qt', 'fixed_qt_price', 'unfixed_qt', 'unfixed_qt_price')
def on_change_unfixed_qt(self):
self.eod_price = self.compute_eod_price()
@fields.depends('fixed_qt', 'fixed_qt_price', 'unfixed_qt', 'unfixed_qt_price')
def on_change_unfixed_qt_price(self):
self.eod_price = self.compute_eod_price()
@classmethod
def create(cls, vlist):
records = super(Pricing, cls).create(vlist)
cls._sync_manual_values(records)
cls._sync_manual_last(records)
cls._sync_eod_price(records)
return records
@classmethod
def write(cls, *args):
super(Pricing, cls).write(*args)
if (Transaction().context.get('skip_pricing_eod_sync')
or Transaction().context.get('skip_pricing_last_sync')):
return
records = []
actions = iter(args)
for record_set, values in zip(actions, actions):
if values:
records.extend(record_set)
cls._sync_manual_values(records)
cls._sync_manual_last(records)
cls._sync_eod_price(records)
@classmethod
def _sync_eod_price(cls, records):
if not records:
return
with Transaction().set_context(skip_pricing_eod_sync=True):
for record in records:
eod_price = record.compute_eod_price()
if Decimal(str(record.eod_price or 0)) == Decimal(str(eod_price or 0)):
continue
super(Pricing, cls).write([record], {
'eod_price': eod_price,
})
@classmethod
def _is_manual_pricing_record(cls, record):
component = getattr(record, 'price_component', None)
if component is None:
return True
return not bool(getattr(component, 'auto', False))
@classmethod
def _get_pricing_group_domain(cls, record):
component = getattr(record, 'price_component', None)
if getattr(record, 'sale_line', None):
return [
('sale_line', '=', record.sale_line.id),
('price_component', '=',
component.id if getattr(component, 'id', None) else None),
]
if getattr(record, 'line', None):
return [
('line', '=', record.line.id),
('price_component', '=',
component.id if getattr(component, 'id', None) else None),
]
return None
@classmethod
def _get_base_quantity(cls, record):
owner = getattr(record, 'sale_line', None) or getattr(record, 'line', None)
if not owner:
return Decimal(0)
if hasattr(owner, '_get_pricing_base_quantity'):
return Decimal(str(owner._get_pricing_base_quantity() or 0))
quantity = getattr(owner, 'quantity_theorical', None)
if quantity is None:
quantity = getattr(owner, 'quantity', None)
return Decimal(str(quantity or 0))
@classmethod
def _sync_manual_values(cls, records):
if (not records
or Transaction().context.get('skip_pricing_manual_sync')):
return
domains = []
seen = set()
for record in records:
if not cls._is_manual_pricing_record(record):
continue
domain = cls._get_pricing_group_domain(record)
if not domain:
continue
key = tuple(domain)
if key in seen:
continue
seen.add(key)
domains.append(domain)
if not domains:
return
with Transaction().set_context(
skip_pricing_manual_sync=True,
skip_pricing_last_sync=True,
skip_pricing_eod_sync=True):
for domain in domains:
pricings = cls.search(
domain,
order=[('pricing_date', 'ASC'), ('id', 'ASC')])
if not pricings:
continue
base_quantity = cls._get_base_quantity(pricings[0])
cumul_qt = Decimal(0)
cumul_qt_price = Decimal(0)
total = len(pricings)
for index, pricing in enumerate(pricings):
quantity = Decimal(str(pricing.quantity or 0))
settl_price = Decimal(str(pricing.settl_price or 0))
cumul_qt += quantity
cumul_qt_price += quantity * settl_price
fixed_qt = cumul_qt
if fixed_qt > 0:
fixed_qt_price = round(cumul_qt_price / fixed_qt, 4)
else:
fixed_qt_price = Decimal(0)
unfixed_qt = base_quantity - fixed_qt
if unfixed_qt < Decimal('0.001'):
unfixed_qt = Decimal(0)
fixed_qt = base_quantity
values = {
'fixed_qt': fixed_qt,
'fixed_qt_price': fixed_qt_price,
'unfixed_qt': unfixed_qt,
'unfixed_qt_price': settl_price,
'last': index == (total - 1),
}
eod_price = cls._weighted_average_price(
values['fixed_qt'],
values['fixed_qt_price'],
values['unfixed_qt'],
values['unfixed_qt_price'],
)
values['eod_price'] = eod_price
super(Pricing, cls).write([pricing], values)
@classmethod
def _get_manual_last_group_domain(cls, record):
return cls._get_pricing_group_domain(record)
@classmethod
def _sync_manual_last(cls, records):
if not records:
return
domains = []
seen = set()
for record in records:
domain = cls._get_manual_last_group_domain(record)
if not domain:
continue
key = tuple(domain)
if key in seen:
continue
seen.add(key)
domains.append(domain)
if not domains:
return
with Transaction().set_context(
skip_pricing_last_sync=True,
skip_pricing_eod_sync=True):
for domain in domains:
pricings = cls.search(
domain,
order=[('pricing_date', 'ASC'), ('id', 'ASC')])
if not pricings:
continue
last_pricing = pricings[-1]
for pricing in pricings[:-1]:
if pricing.last:
super(Pricing, cls).write([pricing], {'last': False})
if not last_pricing.last:
super(Pricing, cls).write([last_pricing], {'last': True})
def get_fixed_price(self):
price = Decimal(0)
Pricing = Pool().get('pricing.pricing')
domain = self._get_pricing_group_domain(self)
if not domain:
return price
pricings = Pricing.search(domain, order=[('pricing_date', 'ASC'), ('id', 'ASC')])
if pricings:
cumul_qt = Decimal(0)
cumul_qt_price = Decimal(0)
for pr in pricings:
quantity = Decimal(str(pr.quantity or 0))
settl_price = Decimal(str(pr.settl_price or 0))
cumul_qt += quantity
cumul_qt_price += quantity * settl_price
if pr.id == self.id:
break
if cumul_qt > 0:
price = cumul_qt_price / cumul_qt
return round(price,4)
class Trigger(ModelSQL,ModelView):
"Period rules"
__name__ = "pricing.trigger"
component = fields.Many2One('pricing.component',"Component", ondelete='CASCADE')
pricing_period = fields.Many2One('pricing.period',"Pricing period")
from_p = fields.Date("From",
states={
'readonly': Eval('pricing_period') != None,
})
to_p = fields.Date("To",
states={
'readonly': Eval('pricing_period') != None,
})
average = fields.Boolean("Avg")
last = fields.Boolean("Last")
application_period = fields.Many2One('pricing.period',"Application period")
from_a = fields.Date("From",
states={
'readonly': Eval('application_period') != None,
})
to_a = fields.Date("To",
states={
'readonly': Eval('application_period') != None,
})
@fields.depends('pricing_period')
def on_change_with_application_period(self):
if not self.application_period and self.pricing_period:
return self.pricing_period
def getDateWithEstTrigger(self, period):
PP = Pool().get('pricing.period')
if period == 1:
pp = PP(self.pricing_period)
else:
pp = PP(self.application_period)
CO = Pool().get('pricing.component')
co = CO(self.component)
if co.line:
d = co.getEstimatedTriggerPurchase(pp.trigger)
else:
d = co.getEstimatedTriggerSale(pp.trigger)
date_from,date_to,dates = pp.getDates(d)
return date_from,date_to,d,pp.include,dates
def getApplicationListDates(self, cal):
ld = []
if self.application_period:
date_from, date_to, d, include,dates = self.getDateWithEstTrigger(2)
else:
date_from = self.from_a
date_to = self.to_a
d = None
include = False
ld, lprice = self.getListDates(date_from,date_to,d,include,cal,2,dates)
return ld, lprice
def getPricingListDates(self,cal):
ld = []
if self.pricing_period:
date_from, date_to, d, include,dates = self.getDateWithEstTrigger(1)
else:
date_from = self.from_p#datetime.datetime(self.from_p.year, self.from_p.month, self.from_p.day)
date_to = self.to_p#datetime.datetime(self.to_p.year, self.to_p.month, self.to_p.day)
d = None
include = False
ld, lprice = self.getListDates(date_from,date_to,d,include,cal,1,dates)
return ld, lprice
def getListDates(self,df,dt,t,i,cal,pricing,dates):
l = []
lprice = []
CAL = Pool().get('price.calendar')
if cal:
cal = CAL(cal)
if dates:
for d in dates:
if cal.IsQuote(d):
l.append(d)
if pricing == 1:
lprice.append(self.getprice(d))
return l, lprice
if df and dt:
current_date = datetime.datetime(df.year,df.month,df.day)
dt = datetime.datetime(dt.year,dt.month,dt.day)
while current_date <= dt:
if i or (not i and current_date != t):
if cal:
if cal.IsQuote(current_date):
l.append(current_date)
if pricing == 1:
lprice.append(self.getprice(current_date))
else:
l.append(current_date)
if pricing == 1:
lprice.append(self.getprice(current_date))
current_date += datetime.timedelta(days=1)
return l, lprice
def getprice(self,current_date):
PI = Pool().get('price.price')
PC = Pool().get('pricing.component')
pc = PC(self.component)
pi = PI(pc.price_index)
val = {}
val['date'] = current_date
val['price'] = pi.get_price(current_date,pc.line.unit if pc.line else pc.sale_line.unit,pc.line.currency if pc.line else pc.sale_line.currency,self.last)
val['avg'] = val['price']
val['avg_minus_1'] = val['price']
val['isAvg'] = self.average
return val
class Period(ModelSQL,ModelView):
"Period"
__name__ = 'pricing.period'
name = fields.Char("Name")
trigger = fields.Selection(TRIGGERS, 'Trigger')
include = fields.Boolean("Inc.")
startday = fields.Selection(DAYTYPES,"Start day")
nbds = fields.Integer("Nb")
endday = fields.Selection(DAYTYPES,"End day")
nbde = fields.Integer("Nb")
nbms = fields.Integer("Starting month")
nbme = fields.Integer("Ending month")
every = fields.Selection(DAYS,"Every")
nb_quotation = fields.Integer("Nb quotation")
@classmethod
def default_nbds(cls):
return 0
@classmethod
def default_nbde(cls):
return 0
@classmethod
def default_nbms(cls):
return 0
@classmethod
def default_nbme(cls):
return 0
def getDates(self,t):
date_from = None
date_to = None
dates = []
if t:
if self.every:
if t:
j = self.every
if j not in WEEKDAY_MAP:
raise ValueError(f"Invalid day : '{j}'")
weekday_target = WEEKDAY_MAP[j]
if self.trigger == 'delmonth':
first_day = t.replace(day=1)
days_to_add = (weekday_target - first_day.weekday()) % 7
current = first_day + datetime.timedelta(days=days_to_add)
while current.month == t.month:
dates.append(datetime.datetime(current.year, current.month, current.day))
current += datetime.timedelta(days=7)
elif self.nb_quotation > 0:
days_to_add = (weekday_target - t.weekday()) % 7
current = t + datetime.timedelta(days=days_to_add)
while len(dates) < self.nb_quotation:
dates.append(datetime.datetime(current.year, current.month, current.day))
current += datetime.timedelta(days=7)
elif self.nb_quotation < 0:
days_to_sub = (t.weekday() - weekday_target) % 7
current = t - datetime.timedelta(days=days_to_sub)
while len(dates) < -self.nb_quotation:
dates.append(datetime.datetime(current.year, current.month, current.day))
current -= datetime.timedelta(days=7)
else:
if self.startday == 'before':
date_from = t - datetime.timedelta(days=(self.nbds if self.nbds else 0))
elif self.startday == 'after':
date_from = t + datetime.timedelta(days=(self.nbds if self.nbds else 0))
elif self.startday == 'first':
date_from = datetime.datetime(t.year, t.month % 12 + (self.nbms if self.nbms else 0), 1)
elif self.startday == 'last':
date_from = datetime.datetime(t.year, t.month % 12 + 1, 1) - datetime.timedelta(days=1)
elif self.startday == 'xth':
date_from = datetime.datetime(t.year, t.month % 12, (self.nbds if self.nbds else 1))
else:
date_from = datetime.datetime(t.year, t.month, t.day)
if self.endday == 'before':
date_to = t - datetime.timedelta(days=(self.nbde if self.nbde else 0))
elif self.endday == 'after':
date_to = t + datetime.timedelta(days=(self.nbde if self.nbde else 0))
elif self.endday == 'first':
date_to = datetime.datetime(t.year, t.month % 12 + (self.nbme if self.nbme else 0), 1)
elif self.endday == 'last':
date_to = datetime.datetime(t.year, t.month % 12 + 1, 1) - datetime.timedelta(days=1)
elif self.endday == 'xth':
date_to = datetime.datetime(t.year, t.month % 12, (self.nbds if self.nbds else 1))
else:
date_to = date_from
return date_from, date_to, dates