Files
tradon/modules/risk/myvar.py
2025-12-26 13:11:43 +00:00

429 lines
17 KiB
Python
Executable File

import stdnum.exceptions
from sql import Column, Literal
import datetime
from sql.aggregate import Count, Max, Min, Sum, Avg, BoolOr
from sql.conditionals import Case
from stdnum import get_cc_module
from sql.functions import CharLength, CurrentTimestamp, DateTrunc, Extract
from trytond.i18n import gettext
from trytond.model import (
DeactivableMixin, Index, ModelSQL, ModelView, MultiValueMixin, Unique,
ValueMixin, convert_from, fields, sequence_ordered)
from trytond.model.exceptions import AccessError
from trytond.pool import Pool
from trytond.pyson import Bool, Eval
from trytond.tools import is_full_text, lstrip_wildcard
from trytond.transaction import Transaction, inactive_records
from trytond.wizard import Button, StateTransition, StateView, Wizard
from decimal import Decimal
from var import VaR, load_data
import numpy as np
import pandas as pd
import math
import logging
import datetime
import io
import contextlib
import warnings
logger = logging.getLogger(__name__)
VARTYPE = [
('historical', 'Historical'),
('parametric', 'Parametric'),
('montecarlo', 'Monté Carlo'),
('garch', 'GARCH'),
]
VARMODEL = [
(None, ''),
('norm', 'Normal'),
('lognorm', 'Log Normal'),
('t', 'T student'),
('uniform', 'Uniform'),
('expon', 'Exponential'),
('exponpaw', 'Exp. Pow'),
('powerlaw', 'Power law'),
('rayleigh', 'Rayleigh'),
('chauchy', 'Cauchy'),
('gamma', 'Gamma'),
('chi2', 'Chi2'),
('f', 'F'),
('gumbel_r', 'Gumbel_r'),
]
class Var(ModelSQL, ModelView):
"Value at risk"
__name__ = 'risk.var'
name = fields.Char("Name")
horizon = fields.Integer("Time horizon")
confidence = fields.Numeric("Confidence")
type = fields.Selection(VARTYPE,"Type")
from_date = fields.Date("From")
to_date = fields.Date("To")
nb_days = fields.Integer("Nb days")
lines = fields.One2Many('risk.var.line','var',"Lines")
values = fields.One2Many('risk.var.values','var',"Values")
product = fields.Many2One('product.product',"Product")
party = fields.Many2One('party.party',"Trader")
execute = fields.Boolean("Schedule to position")
result = fields.Text("Result")
market = fields.Selection([
(None, ''),
("Coffee ICE US", "Coffee ICE US"),
("Cocoa ICE US", "Cocoa ICE US"),
("Coffee ICE EU", "Coffee ICE EU"),
("Cocoa ICE EU", "Cocoa ICE EU")
],"Market")
model = fields.Selection(VARMODEL,"Model")
var_his = fields.Numeric("Historical")
var_par = fields.Numeric("Parametric")
var_mc = fields.Numeric("Monte Carlo")
var_gar = fields.Numeric("GARCH")
cvar_his = fields.Numeric("")
cvar_par = fields.Numeric("")
cvar_mc = fields.Numeric("")
cvar_gar = fields.Numeric("")
cdar_his = fields.Numeric("")
cdar_par = fields.Numeric("")
cdar_mc = fields.Numeric("")
cdar_gar = fields.Numeric("")
@classmethod
def default_model(cls):
return 'norm'
class VarLine(ModelSQL, ModelView):
"VaR line"
__name__ = 'risk.var.line'
var = fields.Many2One('risk.var',"Var")
curve = fields.Many2One('price.price',"Curve")
model = fields.Selection(VARMODEL,"Model")
weight = fields.Numeric("Weight")
nb_days = fields.Integer("Backtest days")
nb_days_past = fields.Integer("Historical days")
mean = fields.Numeric("Mean")
std = fields.Numeric("Std")
simul_date = fields.Date("Date")
dfn = fields.Numeric("Dfn")
dfn = fields.Numeric("Dfd")
perf = fields.One2Many('risk.perf','line',"Perf")
perf_comput = fields.Function(fields.One2Many('risk.perf','line',
"VaR computation",
order=[
('date', 'ASC'),
],),'get_perfs')
perf_past = fields.Function(fields.One2Many('risk.perf','line',
"Historical data",
order=[
('date', 'ASC'),
],),'get_perfs_past')
fit_text = fields.Text("Fit result")
fit_text_dyn = fields.Function(fields.Text("Fit result"),'get_fit_info')
fit = fields.Boolean("Fit distribution")
simul_comput = fields.Selection(VARMODEL,"Simulate with")
gen_simul_comput = fields.Boolean("Generate")
simul_past = fields.Selection(VARMODEL,"Simulate with")
gen_simul_past = fields.Boolean("Generate")
link = fields.Boolean("Link position")
month = fields.Char("Month")
future_market = fields.Char("Market")
whatif = fields.Numeric("What if (%)")
simul_return = fields.Numeric("Return")
simul_return_past = fields.Numeric("Return")
@classmethod
def default_simul_return(cls):
return 0.2
@classmethod
def default_simul_return_past(cls):
return 0.2
@classmethod
def default_nb_days(cls):
return 100
@classmethod
def default_nb_days_past(cls):
return 100
@classmethod
def default_link(cls):
return True
@classmethod
def default_simul_date(cls):
Date = Pool().get('ir.date')
return Date.today()
def get_fit_info(self,name):
return self.fit_text
def get_perfs_ordered(self,from_date,to_date):
perfs = []
Perf = Pool().get('risk.perf')
perfs.extend(Perf.search([('line','=',self.id),('date','>=',from_date),('date','<=',to_date)],order=[('date','ASC')]))
return perfs
def get_perfs_past(self, name):
pastdate = self.simul_date - datetime.timedelta(days=(self.nb_days_past+self.nb_days))
pastdate2 = self.simul_date - datetime.timedelta(days=self.nb_days)
return self.get_perfs_ordered(pastdate,pastdate2)
def get_perfs(self, name):
pastdate = self.simul_date - datetime.timedelta(days=self.nb_days)
return self.get_perfs_ordered(pastdate,self.simul_date)
@classmethod
def write(cls, *args):
super(VarLine, cls).write(*args)
to_write = []
varlines = sum(args[::2], [])
for vl in varlines:
if vl.gen_simul_comput:
end_date = vl.simul_date
num_days = vl.nb_days
start_date = end_date - datetime.timedelta(num_days)
total_return = vl.simul_return
if vl.simul_comput == 'norm':
daily_returns = np.random.normal(loc=total_return / num_days, scale=0.01, size=num_days)
elif vl.simul_comput == 'lognormal':
daily_returns = np.random.lognormal(mean=total_return / num_days, sigma=0.01, size=num_days)
elif vl.simul_comput == 't':
daily_returns = np.random.standard_t(df=10, size=num_days) * (total_return / num_days)
else:
daily_returns = np.random.normal(loc=total_return / num_days, scale=0.01, size=num_days)
Perf = Pool().get('risk.perf')
to_delete = Perf.search([('line','=',vl.id),('date','>=',start_date)])
Perf.delete(to_delete)
for i in range(num_days):
p = Perf()
p.line = vl.id
p.price_index = vl.curve
p.date = start_date + datetime.timedelta(days=i)
p.d_perf = daily_returns[i]
Perf.save([p])
to_write.extend(([vl], {
'gen_simul_comput': False,
}))
if vl.gen_simul_past:
end_date = vl.simul_date - datetime.timedelta(vl.nb_days)
num_days = vl.nb_days_past
start_date = end_date - datetime.timedelta(num_days)
total_return = vl.simul_return_past
if vl.simul_past == 'norm':
daily_returns = np.random.normal(loc=total_return / num_days, scale=0.01, size=num_days)
elif vl.simul_past == 'lognormal':
daily_returns = np.random.lognormal(mean=total_return / num_days, sigma=0.01, size=num_days)
elif vl.simul_past == 't':
daily_returns = np.random.standard_t(df=10, size=num_days) * (total_return / num_days)
else:
daily_returns = np.random.normal(loc=total_return / num_days, scale=0.01, size=num_days)
Perf = Pool().get('risk.perf')
to_delete = Perf.search([('line','=',vl.id),('date','>=',start_date),('date','<',end_date)])
Perf.delete(to_delete)
for i in range(num_days):
p = Perf()
p.line = vl.id
p.price_index = vl.curve
p.date = start_date + datetime.timedelta(days=i)
p.d_perf = daily_returns[i]
Perf.save([p])
to_write.extend(([vl], {
'gen_simul_past': False,
}))
if to_write:
cls.write(*to_write)
@classmethod
def validate(cls, varlines):
super(VarLine, cls).validate(varlines)
class Perf(ModelSQL,ModelView):
"Perf"
__name__ = 'risk.perf'
line = fields.Many2One('risk.var.line',"Lines")
price_index = fields.Many2One('price.price',"Curve")
date = fields.Date("Date")
d_perf = fields.Numeric("Perf")
d_up = fields.Numeric("VaR")
d_down = fields.Numeric("Result")
d_es = fields.Numeric("ES")
d_drop = fields.Numeric("CDaR")
d_mean = fields.Numeric("Mean")
d_std = fields.Numeric("Std")
class VarValue(ModelSQL, ModelView):
"VaR value"
__name__ = 'risk.var.values'
var = fields.Many2One('risk.var',"Var")
product = fields.Many2One('product.product',"Product")
party = fields.Many2One('party.party',"Trader")
r_var = fields.Numeric("VaR")
r_es = fields.Numeric("ES")
r_cdar = fields.Numeric("CDaR")
class VarExecute(Wizard):
"Execute VaR"
__name__ = "risk.var.execute"
start = StateTransition()
def get_var(self,type,var):
if type == 'historical':
return var.historic().iloc[0][0],var.historic().iloc[0][1],var.historic().iloc[0][2]
elif type == 'parametric':
return var.parametric().iloc[0][0],var.parametric().iloc[0][1],var.parametric().iloc[0][2]
elif type == 'montecarlo':
return var.monte_carlo().iloc[0][0],var.monte_carlo().iloc[0][1],var.monte_carlo().iloc[0][2]
else:
return var.garch().iloc[0][0],var.garch().iloc[0][1],var.garch().iloc[0][2]
def transition_start(self):
rv = self.records[0]
last_var = None
for l in rv.lines:
Perf = Pool().get('risk.perf')
perf = Perf.search([('line','=',l.id),('date','<=',l.simul_date)],order=[('date','ASC')])
if perf:
data = [(e.date, float(e.d_perf)) for e in perf]
logger.info("DATA:%s",data)
df = pd.DataFrame(data, columns=['Date', 'd_perf'])
df.set_index('Date', inplace=True)
weights = np.array([1.0])
cumul = 0
if len(df) > (l.nb_days + l.nb_days_past):
start = len(df) - (l.nb_days + l.nb_days_past)
else:
start = 0
i = start + l.nb_days_past
for index, p in enumerate(perf):
if index < start:
continue
if index < start + l.nb_days_past:
sub_df = df.iloc[start:(start+index)]
sub_df['d_perf'] = sub_df['d_perf'].astype(float)
logger.info("PPPPP:%s",p)
p.d_mean = np.mean(sub_df)
p.d_std = np.std(sub_df,ddof=1).loc['d_perf']
if math.isnan(p.d_mean):
p.d_mean = None
if math.isnan(p.d_std):
p.d_std = None
Perf.save([p])
continue
subset_df = df.iloc[start:i-1]
with warnings.catch_warnings():
warnings.simplefilter('ignore', UserWarning)
var = VaR(subset_df, weights, alpha=float(1-rv.confidence), distribution=l.model)
#fit distribution if option ticked
if index == start + l.nb_days_past and l.fit:
output = io.StringIO()
with contextlib.redirect_stdout(output):
var.fit_distributions(include_other=True, verbose=True)
l.fit_text = output.getvalue()
Line = Pool().get('risk.var.line')
Line.save([l])
p.d_up,p.d_es,p.d_drop = self.get_var(rv.type, var)
if math.isnan(p.d_up):
p.d_up = None
if math.isnan(p.d_es):
p.d_es = None
if math.isnan(p.d_drop):
p.d_drop = None
if p.d_up:
if p.d_perf < 0 and p.d_up > p.d_perf:
cumul += abs(p.d_perf) - abs(p.d_up)
logger.info("PPPPP2:%s",p)
p.d_down = cumul
Perf.save([p])
i+=1
#update position if execute
if index == len(perf)-1:
last_var = var
if self.records[0].execute:
Pos = Pool().get('risk.position')
pos = Pos.search([('future_market','=',l.future_market),('quantity','>',0),('month','=',l.month),('pricing_type','!=','Priced'),('ct_type','!=','Inventory')])
if pos:
PosVar = Pool().get('risk.position.var')
posvar = PosVar.search([('position','in',[e.id for e in pos])])
PosVar.delete(posvar)
for po in pos:
pv = PosVar()
pv.var = self.records[0].id
pv.position = po.id
pv.d_var = p.d_up
pv.d_cvar = p.d_es
pv.d_cdar = p.d_drop
pv.whatif = (1 + l.whatif/100) if l.whatif else 1
PosVar.save([pv])
Var = Pool().get('risk.var')
v = Var(rv)
d_up,d_es,d_drop = self.get_var('historical', last_var)
v.var_his = d_up
v.cvar_his = d_es
v.cdar_his = d_drop
d_up,d_es,d_drop = self.get_var('parametric', last_var)
v.var_par = d_up
v.cvar_par = d_es
v.cdar_par = d_drop
d_up,d_es,d_drop = self.get_var('montecarlo', last_var)
v.var_mc = d_up
v.cvar_mc = d_es
v.cdar_mc = d_drop
d_up,d_es,d_drop = self.get_var('garch', last_var)
v.var_gar = d_up
v.cvar_gar = d_es
v.cdar_gar = d_drop
Var.save([v])
return 'end'
class VarGetPos(Wizard):
"Get position"
__name__ = "risk.var.get"
start = StateTransition()
def get_curve(self,month,market):
Price = Pool().get('price.price')
PM = Pool().get('product.month')
pm = PM.search([('month_name','=',month)])
if pm:
price = Price.search([('price_desc','=',market),('price_period','=',pm[0])])
if price:
return price[0].id
def transition_start(self):
rv = self.records[0]
Pos = Pool().get('risk.position.process')
if rv.market:
pos = Pos.search([('future_market','=',rv.market)])
else:
pos = Pos.search([])
if pos:
VarLine = Pool().get('risk.var.line')
tot_quantity = sum([e.quantity for e in pos])
logger.info("TOTQAUNTITY:%s",tot_quantity)
for p in pos:
vl = VarLine()
vl.var = rv.id
vl.month = p.month
vl.future_market = p.future_market
if rv.model:
vl.model = rv.model
else:
vl.model = 'norm'
vl.curve = self.get_curve(p.month,p.future_market)
# if tot_quantity > 0:
# vl.weight = p.quantity / tot_quantity
VarLine.save([vl])
return 'end'