import stdnum.exceptions from sql import Column, Literal import datetime from sql.aggregate import Count, Max, Min, Sum, Avg, BoolOr from sql.conditionals import Case from stdnum import get_cc_module from sql.functions import CharLength, CurrentTimestamp, DateTrunc, Extract from trytond.i18n import gettext from trytond.model import ( DeactivableMixin, Index, ModelSQL, ModelView, MultiValueMixin, Unique, ValueMixin, convert_from, fields, sequence_ordered) from trytond.model.exceptions import AccessError from trytond.pool import Pool from trytond.pyson import Bool, Eval from trytond.tools import is_full_text, lstrip_wildcard from trytond.transaction import Transaction, inactive_records from trytond.wizard import Button, StateTransition, StateView, Wizard from decimal import Decimal from var import VaR, load_data import numpy as np import pandas as pd import math import logging import datetime import io import contextlib import warnings logger = logging.getLogger(__name__) VARTYPE = [ ('historical', 'Historical'), ('parametric', 'Parametric'), ('montecarlo', 'Monté Carlo'), ('garch', 'GARCH'), ] VARMODEL = [ (None, ''), ('norm', 'Normal'), ('lognorm', 'Log Normal'), ('t', 'T student'), ('uniform', 'Uniform'), ('expon', 'Exponential'), ('exponpaw', 'Exp. Pow'), ('powerlaw', 'Power law'), ('rayleigh', 'Rayleigh'), ('chauchy', 'Cauchy'), ('gamma', 'Gamma'), ('chi2', 'Chi2'), ('f', 'F'), ('gumbel_r', 'Gumbel_r'), ] class Var(ModelSQL, ModelView): "Value at risk" __name__ = 'risk.var' name = fields.Char("Name") horizon = fields.Integer("Time horizon") confidence = fields.Numeric("Confidence") type = fields.Selection(VARTYPE,"Type") from_date = fields.Date("From") to_date = fields.Date("To") nb_days = fields.Integer("Nb days") lines = fields.One2Many('risk.var.line','var',"Lines") values = fields.One2Many('risk.var.values','var',"Values") product = fields.Many2One('product.product',"Product") party = fields.Many2One('party.party',"Trader") execute = fields.Boolean("Schedule to position") result = fields.Text("Result") market = fields.Selection([ (None, ''), ("Coffee ICE US", "Coffee ICE US"), ("Cocoa ICE US", "Cocoa ICE US"), ("Coffee ICE EU", "Coffee ICE EU"), ("Cocoa ICE EU", "Cocoa ICE EU") ],"Market") model = fields.Selection(VARMODEL,"Model") var_his = fields.Numeric("Historical") var_par = fields.Numeric("Parametric") var_mc = fields.Numeric("Monte Carlo") var_gar = fields.Numeric("GARCH") cvar_his = fields.Numeric("") cvar_par = fields.Numeric("") cvar_mc = fields.Numeric("") cvar_gar = fields.Numeric("") cdar_his = fields.Numeric("") cdar_par = fields.Numeric("") cdar_mc = fields.Numeric("") cdar_gar = fields.Numeric("") @classmethod def default_model(cls): return 'norm' class VarLine(ModelSQL, ModelView): "VaR line" __name__ = 'risk.var.line' var = fields.Many2One('risk.var',"Var") curve = fields.Many2One('price.price',"Curve") model = fields.Selection(VARMODEL,"Model") weight = fields.Numeric("Weight") nb_days = fields.Integer("Backtest days") nb_days_past = fields.Integer("Historical days") mean = fields.Numeric("Mean") std = fields.Numeric("Std") simul_date = fields.Date("Date") dfn = fields.Numeric("Dfn") dfn = fields.Numeric("Dfd") perf = fields.One2Many('risk.perf','line',"Perf") perf_comput = fields.Function(fields.One2Many('risk.perf','line', "VaR computation", order=[ ('date', 'ASC'), ],),'get_perfs') perf_past = fields.Function(fields.One2Many('risk.perf','line', "Historical data", order=[ ('date', 'ASC'), ],),'get_perfs_past') fit_text = fields.Text("Fit result") fit_text_dyn = fields.Function(fields.Text("Fit result"),'get_fit_info') fit = fields.Boolean("Fit distribution") simul_comput = fields.Selection(VARMODEL,"Simulate with") gen_simul_comput = fields.Boolean("Generate") simul_past = fields.Selection(VARMODEL,"Simulate with") gen_simul_past = fields.Boolean("Generate") link = fields.Boolean("Link position") month = fields.Char("Month") future_market = fields.Char("Market") whatif = fields.Numeric("What if (%)") simul_return = fields.Numeric("Return") simul_return_past = fields.Numeric("Return") @classmethod def default_simul_return(cls): return 0.2 @classmethod def default_simul_return_past(cls): return 0.2 @classmethod def default_nb_days(cls): return 100 @classmethod def default_nb_days_past(cls): return 100 @classmethod def default_link(cls): return True @classmethod def default_simul_date(cls): Date = Pool().get('ir.date') return Date.today() def get_fit_info(self,name): return self.fit_text def get_perfs_ordered(self,from_date,to_date): perfs = [] Perf = Pool().get('risk.perf') perfs.extend(Perf.search([('line','=',self.id),('date','>=',from_date),('date','<=',to_date)],order=[('date','ASC')])) return perfs def get_perfs_past(self, name): pastdate = self.simul_date - datetime.timedelta(days=(self.nb_days_past+self.nb_days)) pastdate2 = self.simul_date - datetime.timedelta(days=self.nb_days) return self.get_perfs_ordered(pastdate,pastdate2) def get_perfs(self, name): pastdate = self.simul_date - datetime.timedelta(days=self.nb_days) return self.get_perfs_ordered(pastdate,self.simul_date) @classmethod def write(cls, *args): super(VarLine, cls).write(*args) to_write = [] varlines = sum(args[::2], []) for vl in varlines: if vl.gen_simul_comput: end_date = vl.simul_date num_days = vl.nb_days start_date = end_date - datetime.timedelta(num_days) total_return = vl.simul_return if vl.simul_comput == 'norm': daily_returns = np.random.normal(loc=total_return / num_days, scale=0.01, size=num_days) elif vl.simul_comput == 'lognormal': daily_returns = np.random.lognormal(mean=total_return / num_days, sigma=0.01, size=num_days) elif vl.simul_comput == 't': daily_returns = np.random.standard_t(df=10, size=num_days) * (total_return / num_days) else: daily_returns = np.random.normal(loc=total_return / num_days, scale=0.01, size=num_days) Perf = Pool().get('risk.perf') to_delete = Perf.search([('line','=',vl.id),('date','>=',start_date)]) Perf.delete(to_delete) for i in range(num_days): p = Perf() p.line = vl.id p.price_index = vl.curve p.date = start_date + datetime.timedelta(days=i) p.d_perf = daily_returns[i] Perf.save([p]) to_write.extend(([vl], { 'gen_simul_comput': False, })) if vl.gen_simul_past: end_date = vl.simul_date - datetime.timedelta(vl.nb_days) num_days = vl.nb_days_past start_date = end_date - datetime.timedelta(num_days) total_return = vl.simul_return_past if vl.simul_past == 'norm': daily_returns = np.random.normal(loc=total_return / num_days, scale=0.01, size=num_days) elif vl.simul_past == 'lognormal': daily_returns = np.random.lognormal(mean=total_return / num_days, sigma=0.01, size=num_days) elif vl.simul_past == 't': daily_returns = np.random.standard_t(df=10, size=num_days) * (total_return / num_days) else: daily_returns = np.random.normal(loc=total_return / num_days, scale=0.01, size=num_days) Perf = Pool().get('risk.perf') to_delete = Perf.search([('line','=',vl.id),('date','>=',start_date),('date','<',end_date)]) Perf.delete(to_delete) for i in range(num_days): p = Perf() p.line = vl.id p.price_index = vl.curve p.date = start_date + datetime.timedelta(days=i) p.d_perf = daily_returns[i] Perf.save([p]) to_write.extend(([vl], { 'gen_simul_past': False, })) if to_write: cls.write(*to_write) @classmethod def validate(cls, varlines): super(VarLine, cls).validate(varlines) class Perf(ModelSQL,ModelView): "Perf" __name__ = 'risk.perf' line = fields.Many2One('risk.var.line',"Lines") price_index = fields.Many2One('price.price',"Curve") date = fields.Date("Date") d_perf = fields.Numeric("Perf") d_up = fields.Numeric("VaR") d_down = fields.Numeric("Result") d_es = fields.Numeric("ES") d_drop = fields.Numeric("CDaR") d_mean = fields.Numeric("Mean") d_std = fields.Numeric("Std") class VarValue(ModelSQL, ModelView): "VaR value" __name__ = 'risk.var.values' var = fields.Many2One('risk.var',"Var") product = fields.Many2One('product.product',"Product") party = fields.Many2One('party.party',"Trader") r_var = fields.Numeric("VaR") r_es = fields.Numeric("ES") r_cdar = fields.Numeric("CDaR") class VarExecute(Wizard): "Execute VaR" __name__ = "risk.var.execute" start = StateTransition() def get_var(self,type,var): if type == 'historical': return var.historic().iloc[0][0],var.historic().iloc[0][1],var.historic().iloc[0][2] elif type == 'parametric': return var.parametric().iloc[0][0],var.parametric().iloc[0][1],var.parametric().iloc[0][2] elif type == 'montecarlo': return var.monte_carlo().iloc[0][0],var.monte_carlo().iloc[0][1],var.monte_carlo().iloc[0][2] else: return var.garch().iloc[0][0],var.garch().iloc[0][1],var.garch().iloc[0][2] def transition_start(self): rv = self.records[0] last_var = None for l in rv.lines: Perf = Pool().get('risk.perf') perf = Perf.search([('line','=',l.id),('date','<=',l.simul_date)],order=[('date','ASC')]) if perf: data = [(e.date, float(e.d_perf)) for e in perf] logger.info("DATA:%s",data) df = pd.DataFrame(data, columns=['Date', 'd_perf']) df.set_index('Date', inplace=True) weights = np.array([1.0]) cumul = 0 if len(df) > (l.nb_days + l.nb_days_past): start = len(df) - (l.nb_days + l.nb_days_past) else: start = 0 i = start + l.nb_days_past for index, p in enumerate(perf): if index < start: continue if index < start + l.nb_days_past: sub_df = df.iloc[start:(start+index)] sub_df['d_perf'] = sub_df['d_perf'].astype(float) logger.info("PPPPP:%s",p) p.d_mean = np.mean(sub_df) p.d_std = np.std(sub_df,ddof=1).loc['d_perf'] if math.isnan(p.d_mean): p.d_mean = None if math.isnan(p.d_std): p.d_std = None Perf.save([p]) continue subset_df = df.iloc[start:i-1] with warnings.catch_warnings(): warnings.simplefilter('ignore', UserWarning) var = VaR(subset_df, weights, alpha=float(1-rv.confidence), distribution=l.model) #fit distribution if option ticked if index == start + l.nb_days_past and l.fit: output = io.StringIO() with contextlib.redirect_stdout(output): var.fit_distributions(include_other=True, verbose=True) l.fit_text = output.getvalue() Line = Pool().get('risk.var.line') Line.save([l]) p.d_up,p.d_es,p.d_drop = self.get_var(rv.type, var) if math.isnan(p.d_up): p.d_up = None if math.isnan(p.d_es): p.d_es = None if math.isnan(p.d_drop): p.d_drop = None if p.d_up: if p.d_perf < 0 and p.d_up > p.d_perf: cumul += abs(p.d_perf) - abs(p.d_up) logger.info("PPPPP2:%s",p) p.d_down = cumul Perf.save([p]) i+=1 #update position if execute if index == len(perf)-1: last_var = var if self.records[0].execute: Pos = Pool().get('risk.position') pos = Pos.search([('future_market','=',l.future_market),('quantity','>',0),('month','=',l.month),('pricing_type','!=','Priced'),('ct_type','!=','Inventory')]) if pos: PosVar = Pool().get('risk.position.var') posvar = PosVar.search([('position','in',[e.id for e in pos])]) PosVar.delete(posvar) for po in pos: pv = PosVar() pv.var = self.records[0].id pv.position = po.id pv.d_var = p.d_up pv.d_cvar = p.d_es pv.d_cdar = p.d_drop pv.whatif = (1 + l.whatif/100) if l.whatif else 1 PosVar.save([pv]) Var = Pool().get('risk.var') v = Var(rv) d_up,d_es,d_drop = self.get_var('historical', last_var) v.var_his = d_up v.cvar_his = d_es v.cdar_his = d_drop d_up,d_es,d_drop = self.get_var('parametric', last_var) v.var_par = d_up v.cvar_par = d_es v.cdar_par = d_drop d_up,d_es,d_drop = self.get_var('montecarlo', last_var) v.var_mc = d_up v.cvar_mc = d_es v.cdar_mc = d_drop d_up,d_es,d_drop = self.get_var('garch', last_var) v.var_gar = d_up v.cvar_gar = d_es v.cdar_gar = d_drop Var.save([v]) return 'end' class VarGetPos(Wizard): "Get position" __name__ = "risk.var.get" start = StateTransition() def get_curve(self,month,market): Price = Pool().get('price.price') PM = Pool().get('product.month') pm = PM.search([('month_name','=',month)]) if pm: price = Price.search([('price_desc','=',market),('price_period','=',pm[0])]) if price: return price[0].id def transition_start(self): rv = self.records[0] Pos = Pool().get('risk.position.process') if rv.market: pos = Pos.search([('future_market','=',rv.market)]) else: pos = Pos.search([]) if pos: VarLine = Pool().get('risk.var.line') tot_quantity = sum([e.quantity for e in pos]) logger.info("TOTQAUNTITY:%s",tot_quantity) for p in pos: vl = VarLine() vl.var = rv.id vl.month = p.month vl.future_market = p.future_market if rv.model: vl.model = rv.model else: vl.model = 'norm' vl.curve = self.get_curve(p.month,p.future_market) # if tot_quantity > 0: # vl.weight = p.quantity / tot_quantity VarLine.save([vl]) return 'end'