# This file is part of Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. from trytond.model import fields from trytond.pool import Pool, PoolMeta from trytond.pyson import Bool, Eval, Id from trytond.model import (ModelSQL, ModelView) from trytond.tools import is_full_text, lstrip_wildcard from trytond.transaction import Transaction, inactive_records from decimal import getcontext, Decimal, ROUND_HALF_UP from sql.aggregate import Count, Max, Min, Sum, Avg, BoolOr from sql.conditionals import Case from sql import Column, Literal from sql.functions import CurrentTimestamp, DateTrunc from trytond.wizard import Button, StateTransition, StateView, Wizard from itertools import chain, groupby from operator import itemgetter import datetime import logging from trytond.modules.purchase_trade.purchase import (TRIGGERS) logger = logging.getLogger(__name__) DAYTYPES = [ (None,''), ('before', 'Nb days before'), ('after', 'Nb days after'), ('first', 'First day'), ('last', 'Last day'), ('xth', 'Nth day'), ] WEEKDAY_MAP = { 'monday': 0, 'tuesday': 1, 'wednesday': 2, 'thursday': 3, 'friday': 4, 'saturday': 5, 'sunday': 6 } DAYS = [ (None,''), ('monday', 'Monday'), ('tuesday', 'Tuesday'), ('wednesday', 'Wednesday'), ('thursday', 'Thursday'), ('friday', 'Friday'), ('saturday', 'Saturday'), ('sunday', 'Sunday'), ] class Estimated(ModelSQL, ModelView): "Estimated date" __name__ = 'pricing.estimated' trigger = fields.Selection(TRIGGERS,"Trigger") estimated_date = fields.Date("Estimated date") fin_int_delta = fields.Integer("Financing interests delta") class MtmScenario(ModelSQL, ModelView): "MtM Scenario" __name__ = 'mtm.scenario' name = fields.Char("Scenario", required=True) valuation_date = fields.Date("Valuation Date", required=True) use_last_price = fields.Boolean("Use Last Available Price") calendar = fields.Many2One( 'price.calendar', "Calendar" ) class MtmStrategy(ModelSQL, ModelView): "Mark to Market Strategy" __name__ = 'mtm.strategy' name = fields.Char("Name", required=True) active = fields.Boolean("Active") scenario = fields.Many2One( 'mtm.scenario', "Scenario", required=True ) currency = fields.Many2One( 'currency.currency', "Valuation Currency" ) components = fields.One2Many( 'pricing.component', 'strategy', "Components" ) @classmethod def default_active(cls): return True def get_mtm(self,line,qty): pool = Pool() Currency = pool.get('currency.currency') total = Decimal(0) scenario = self.scenario dt = scenario.valuation_date for comp in self.components: value = Decimal(0) if comp.price_source_type == 'curve' and comp.price_index: value = Decimal( comp.price_index.get_price( dt, line.unit, self.currency, last=scenario.use_last_price ) ) elif comp.price_source_type == 'matrix' and comp.price_matrix: value = self._get_matrix_price(comp, line, dt) if comp.ratio: value *= Decimal(comp.ratio) / Decimal(100) total += value * qty return Decimal(str(total)).quantize(Decimal("0.01")) def _get_matrix_price(self, comp, line, dt): MatrixLine = Pool().get('price.matrix.line') domain = [ ('matrix', '=', comp.price_matrix.id), ] if line: domain += [ ('origin', '=', line.purchase.from_location), ('destination', '=', line.purchase.to_location), ] lines = MatrixLine.search(domain) if lines: return Decimal(lines[0].price_value) return Decimal(0) def run_daily_mtm(): Strategy = Pool().get('mtm.strategy') Snapshot = Pool().get('mtm.snapshot') for strat in Strategy.search([('active', '=', True)]): amount = strat.compute_mtm() Snapshot.create([{ 'strategy': strat.id, 'valuation_date': strat.scenario.valuation_date, 'amount': amount, 'currency': strat.currency.id, }]) class Mtm(ModelSQL, ModelView): "MtM Component" __name__ = 'mtm.component' strategy = fields.Many2One( 'mtm.strategy', "Strategy", required=True, ondelete='CASCADE' ) name = fields.Char("Component", required=True) component_type = fields.Selection([ ('commodity', 'Commodity'), ('freight', 'Freight'), ('quality', 'Quality'), ('fx', 'FX'), ('storage', 'Storage'), ('other', 'Other'), ], "Type", required=True) fix_type = fields.Many2One('price.fixtype', "Fixation Type") price_source_type = fields.Selection([ ('curve', 'Curve'), ('matrix', 'Matrix'), ('manual', 'Manual'), ], "Price Source", required=True) price_index = fields.Many2One('price.price', "Price Curve") price_matrix = fields.Many2One('price.matrix', "Price Matrix") ratio = fields.Numeric("Ratio / %", digits=(16, 6)) manual_price = fields.Numeric( "Manual Price", digits=(16, 6), help="Price set manually if price_source_type is 'manual'" ) currency = fields.Many2One('currency.currency', "Currency") def get_cur(self, name=None): if self.price_index: return self.price_index.price_currency if self.price_matrix: return self.price_matrix.currency return None @fields.depends('price_index','price_matrix') def on_change_with_currency(self): return self.get_cur() class PriceMatrix(ModelSQL, ModelView): "Price Matrix" __name__ = 'price.matrix' name = fields.Char("Name", required=True) matrix_type = fields.Selection([ ('freight', 'Freight'), ('location', 'Location Spread'), ('quality', 'Quality'), ('storage', 'Storage'), ('other', 'Other'), ], "Matrix Type", required=True) unit = fields.Many2One('product.uom', "Unit") currency = fields.Many2One('currency.currency', "Currency") calendar = fields.Many2One( 'price.calendar', "Calendar" ) valid_from = fields.Date("Valid From") valid_to = fields.Date("Valid To") lines = fields.One2Many( 'price.matrix.line', 'matrix', "Lines" ) class PriceMatrixLine(ModelSQL, ModelView): "Price Matrix Line" __name__ = 'price.matrix.line' matrix = fields.Many2One( 'price.matrix', "Matrix", required=True, ondelete='CASCADE' ) origin = fields.Many2One('stock.location', "Origin") destination = fields.Many2One('stock.location', "Destination") product = fields.Many2One('product.product', "Product") quality = fields.Many2One('product.category', "Quality") price_value = fields.Numeric("Price", digits=(16, 6)) class MtmSnapshot(ModelSQL, ModelView): "MtM Snapshot" __name__ = 'mtm.snapshot' strategy = fields.Many2One( 'mtm.strategy', "Strategy", required=True, ondelete='CASCADE' ) valuation_date = fields.Date("Valuation Date", required=True) amount = fields.Numeric("MtM Amount", digits=(16, 6)) currency = fields.Many2One('currency.currency', "Currency") created_at = fields.DateTime("Created At") class Component(ModelSQL, ModelView): "Component" __name__ = 'pricing.component' strategy = fields.Many2One( 'mtm.strategy', "Strategy", required=False, ondelete='CASCADE' ) price_source_type = fields.Selection([ ('curve', 'Curve'), ('matrix', 'Matrix'), # ('manual', 'Manual'), ], "Price Source", required=True) fix_type = fields.Many2One('price.fixtype',"Fixation type") ratio = fields.Numeric("%",digits=(16,7)) price_index = fields.Many2One('price.price',"Curve") price_matrix = fields.Many2One('price.matrix', "Price Matrix") currency = fields.Function(fields.Many2One('currency.currency',"Curr."),'get_cur') auto = fields.Boolean("Auto") fallback = fields.Boolean("Fallback") calendar = fields.Many2One('price.calendar',"Calendar") nbdays = fields.Function(fields.Integer("Nb days"),'get_nbdays') triggers = fields.One2Many('pricing.trigger','component',"Period rules") pricing_date = fields.Date("Pricing date max") def get_rec_name(self, name=None): if self.price_index: return '[' + self.fix_type.name + '] ' + self.price_index.price_index else: return '[' + self.fix_type.name + '] ' def get_cur(self,name): if self.price_index: PI = Pool().get('price.price') pi = PI(self.price_index) return pi.price_currency def get_nbdays(self, name): days = 0 if self.triggers: for t in self.triggers: l,l2 = t.getApplicationListDates(self.calendar) days += len(l) return days @classmethod def delete(cls, components): for cp in components: Pricing = Pool().get('pricing.pricing') pricings = Pricing.search(['price_component','=',cp.id]) if pricings: Pricing.delete(pricings) super(Component, cls).delete(components) class Pricing(ModelSQL,ModelView): "Pricing" __name__ = 'pricing.pricing' pricing_date = fields.Date("Date") price_component = fields.Many2One('pricing.component', "Component")#, domain=[('id', 'in', Eval('line.price_components'))], ondelete='CASCADE') quantity = fields.Numeric("Qt",digits='unit') settl_price = fields.Numeric("Settl. price",digits='unit') fixed_qt = fields.Numeric("Fixed qt",digits='unit',readonly=True) fixed_qt_price = fields.Numeric("Fixed qt price",digits='unit',readonly=True) unfixed_qt = fields.Numeric("Unfixed qt",digits='unit',readonly=True) unfixed_qt_price = fields.Numeric("Unfixed qt price",digits='unit',readonly=True) eod_price = fields.Numeric("EOD price",digits='unit',readonly=True) last = fields.Boolean("Last") @classmethod def default_fixed_qt(cls): return Decimal(0) @classmethod def default_unfixed_qt(cls): return Decimal(0) @classmethod def default_fixed_qt_price(cls): return Decimal(0) @classmethod def default_unfixed_qt_price(cls): return Decimal(0) @classmethod def default_quantity(cls): return Decimal(0) @classmethod def default_settl_price(cls): return Decimal(0) @classmethod def default_eod_price(cls): return Decimal(0) def get_fixed_price(self): price = Decimal(0) Pricing = Pool().get('pricing.pricing') pricings = Pricing.search(['price_component','=',self.price_component.id],order=[('pricing_date', 'ASC')]) if pricings: cumul_qt = Decimal(0) cumul_qt_price = Decimal(0) for pr in pricings: cumul_qt += pr.quantity cumul_qt_price += pr.quantity * pr.settl_price if pr.id == self.id: break if cumul_qt > 0: price = cumul_qt_price / cumul_qt return round(price,4) class Trigger(ModelSQL,ModelView): "Period rules" __name__ = "pricing.trigger" component = fields.Many2One('pricing.component',"Component", ondelete='CASCADE') pricing_period = fields.Many2One('pricing.period',"Pricing period") from_p = fields.Date("From", states={ 'readonly': Eval('pricing_period') != None, }) to_p = fields.Date("To", states={ 'readonly': Eval('pricing_period') != None, }) average = fields.Boolean("Avg") last = fields.Boolean("Last") application_period = fields.Many2One('pricing.period',"Application period") from_a = fields.Date("From", states={ 'readonly': Eval('application_period') != None, }) to_a = fields.Date("To", states={ 'readonly': Eval('application_period') != None, }) @fields.depends('pricing_period') def on_change_with_application_period(self): if not self.application_period and self.pricing_period: return self.pricing_period def getDateWithEstTrigger(self, period): PP = Pool().get('pricing.period') if period == 1: pp = PP(self.pricing_period) else: pp = PP(self.application_period) CO = Pool().get('pricing.component') co = CO(self.component) if co.line: d = co.getEstimatedTriggerPurchase(pp.trigger) else: d = co.getEstimatedTriggerSale(pp.trigger) date_from,date_to,dates = pp.getDates(d) return date_from,date_to,d,pp.include,dates def getApplicationListDates(self, cal): ld = [] if self.application_period: date_from, date_to, d, include,dates = self.getDateWithEstTrigger(2) else: date_from = self.from_a date_to = self.to_a d = None include = False ld, lprice = self.getListDates(date_from,date_to,d,include,cal,2,dates) return ld, lprice def getPricingListDates(self,cal): ld = [] if self.pricing_period: date_from, date_to, d, include,dates = self.getDateWithEstTrigger(1) else: date_from = self.from_p#datetime.datetime(self.from_p.year, self.from_p.month, self.from_p.day) date_to = self.to_p#datetime.datetime(self.to_p.year, self.to_p.month, self.to_p.day) d = None include = False ld, lprice = self.getListDates(date_from,date_to,d,include,cal,1,dates) return ld, lprice def getListDates(self,df,dt,t,i,cal,pricing,dates): l = [] lprice = [] CAL = Pool().get('price.calendar') if cal: cal = CAL(cal) if dates: for d in dates: if cal.IsQuote(d): l.append(d) if pricing == 1: lprice.append(self.getprice(d)) return l, lprice if df and dt: current_date = datetime.datetime(df.year,df.month,df.day) dt = datetime.datetime(dt.year,dt.month,dt.day) while current_date <= dt: if i or (not i and current_date != t): if cal: if cal.IsQuote(current_date): l.append(current_date) if pricing == 1: lprice.append(self.getprice(current_date)) else: l.append(current_date) if pricing == 1: lprice.append(self.getprice(current_date)) current_date += datetime.timedelta(days=1) return l, lprice def getprice(self,current_date): PI = Pool().get('price.price') PC = Pool().get('pricing.component') pc = PC(self.component) pi = PI(pc.price_index) val = {} val['date'] = current_date val['price'] = pi.get_price(current_date,pc.line.unit if pc.line else pc.sale_line.unit,pc.line.currency if pc.line else pc.sale_line.currency,self.last) val['avg'] = val['price'] val['avg_minus_1'] = val['price'] val['isAvg'] = self.average return val class Period(ModelSQL,ModelView): "Period" __name__ = 'pricing.period' name = fields.Char("Name") trigger = fields.Selection(TRIGGERS, 'Trigger') include = fields.Boolean("Inc.") startday = fields.Selection(DAYTYPES,"Start day") nbds = fields.Integer("Nb") endday = fields.Selection(DAYTYPES,"End day") nbde = fields.Integer("Nb") nbms = fields.Integer("Starting month") nbme = fields.Integer("Ending month") every = fields.Selection(DAYS,"Every") nb_quotation = fields.Integer("Nb quotation") @classmethod def default_nbds(cls): return 0 @classmethod def default_nbde(cls): return 0 @classmethod def default_nbms(cls): return 0 @classmethod def default_nbme(cls): return 0 def getDates(self,t): date_from = None date_to = None dates = [] if t: if self.every: if t: j = self.every if j not in WEEKDAY_MAP: raise ValueError(f"Invalid day : '{j}'") weekday_target = WEEKDAY_MAP[j] if self.trigger == 'delmonth': first_day = t.replace(day=1) days_to_add = (weekday_target - first_day.weekday()) % 7 current = first_day + datetime.timedelta(days=days_to_add) while current.month == t.month: dates.append(datetime.datetime(current.year, current.month, current.day)) current += datetime.timedelta(days=7) elif self.nb_quotation > 0: days_to_add = (weekday_target - t.weekday()) % 7 current = t + datetime.timedelta(days=days_to_add) while len(dates) < self.nb_quotation: dates.append(datetime.datetime(current.year, current.month, current.day)) current += datetime.timedelta(days=7) elif self.nb_quotation < 0: days_to_sub = (t.weekday() - weekday_target) % 7 current = t - datetime.timedelta(days=days_to_sub) while len(dates) < -self.nb_quotation: dates.append(datetime.datetime(current.year, current.month, current.day)) current -= datetime.timedelta(days=7) else: if self.startday == 'before': date_from = t - datetime.timedelta(days=(self.nbds if self.nbds else 0)) elif self.startday == 'after': date_from = t + datetime.timedelta(days=(self.nbds if self.nbds else 0)) elif self.startday == 'first': date_from = datetime.datetime(t.year, t.month % 12 + (self.nbms if self.nbms else 0), 1) elif self.startday == 'last': date_from = datetime.datetime(t.year, t.month % 12 + 1, 1) - datetime.timedelta(days=1) elif self.startday == 'xth': date_from = datetime.datetime(t.year, t.month % 12, (self.nbds if self.nbds else 1)) else: date_from = datetime.datetime(t.year, t.month, t.day) if self.endday == 'before': date_to = t - datetime.timedelta(days=(self.nbde if self.nbde else 0)) elif self.endday == 'after': date_to = t + datetime.timedelta(days=(self.nbde if self.nbde else 0)) elif self.endday == 'first': date_to = datetime.datetime(t.year, t.month % 12 + (self.nbme if self.nbme else 0), 1) elif self.endday == 'last': date_to = datetime.datetime(t.year, t.month % 12 + 1, 1) - datetime.timedelta(days=1) elif self.endday == 'xth': date_to = datetime.datetime(t.year, t.month % 12, (self.nbds if self.nbds else 1)) else: date_to = date_from return date_from, date_to, dates