Skip to content

Instantly share code, notes, and snippets.

@39mikufans
Last active October 14, 2020 06:23
Show Gist options
  • Save 39mikufans/9c67e29001c0347f86b978445e0d6b3a to your computer and use it in GitHub Desktop.
Save 39mikufans/9c67e29001c0347f86b978445e0d6b3a to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import abc
from math import exp
import math
import numbers
import time
import numpy as np
import scipy.optimize
import scipy.stats
# Copyright (c) <2020> <TouhouSupport(Ameame)>
# "Anti 996" License Version 1.0 (Draft)
# Permission is hereby granted to any individual or legal entity
# obtaining a copy of this licensed work (including the source code,
# documentation and/or related items, hereinafter collectively referred
# to as the "licensed work"), free of charge, to deal with the licensed
# work for any purpose, including without limitation, the rights to use,
# reproduce, modify, prepare derivative works of, distribute, publish
# and sublicense the licensed work, subject to the following conditions:
# 1. The individual or the legal entity must conspicuously display,
# without modification, this License and the notice on each redistributed
# or derivative copy of the Licensed Work.
# 2. The individual or the legal entity must strictly comply with all
# applicable laws, regulations, rules and standards of the jurisdiction
# relating to labor and employment where the individual is physically
# located or where the individual was born or naturalized; or where the
# legal entity is registered or is operating (whichever is stricter). In
# case that the jurisdiction has no such laws, regulations, rules and
# standards or its laws, regulations, rules and standards are
# unenforceable, the individual or the legal entity are required to
# comply with Core International Labor Standards.
# 3. The individual or the legal entity shall not induce, suggest or force
# its employee(s), whether full-time or part-time, or its independent
# contractor(s), in any methods, to agree in oral or written form, to
# directly or indirectly restrict, weaken or relinquish his or her
# rights or remedies under such laws, regulations, rules and standards
# relating to labor and employment as mentioned above, no matter whether
# such written or oral agreements are enforceable under the laws of the
# said jurisdiction, nor shall such individual or the legal entity
# limit, in any methods, the rights of its employee(s) or independent
# contractor(s) from reporting or complaining to the copyright holder or
# relevant authorities monitoring the compliance of the license about
# its violation(s) of the said license.
# THE LICENSED WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN ANY WAY CONNECTION WITH THE
# LICENSED WORK OR THE USE OR OTHER DEALINGS IN THE LICENSED WORK.
def enum(**enums):
return type('Enum', (), enums)
OptionType = enum(CALL='call', PUT='put')
OptionExerciseType = enum(EUROPEAN='european', AMERICAN='american')
OptionModel = enum(BLACK_SCHOLES='black_scholes', BINOMIAL_TREE='binomial_tree', MONTE_CARLO='monte_carlo')
OptionMeasure = enum(VALUE='value', DELTA='delta', THETA='theta', RHO='rho', VEGA='vega', GAMMA='gamma')
# you can change steps to get a more precise
DEFAULT_BINOMIAL_TREE_NUM_STEPS = 25
DEFAULT_MONTE_CARLO_NUM_STEPS = 50
DEFAULT_MONTE_CARLO_NUM_PATHS = 100
class Instrument(object):
@abc.abstractmethod
def run_model(self):
"""Calculate measures (i.e. theoretical value & greeks) for this instrument"""
class Option(Instrument):
def __init__(self, opt_type, S0,K, dt, sigma=None, r=None, yield_=None, exer_type=None):
self.opt_type = opt_type
self.S0 = S0
self.K = K
self.sigma = sigma
self.dt = dt
self.r = r or 0
self.yield_ = yield_ or 0
self.exer_type = exer_type or OptionExerciseType.AMERICAN
self.model_cache = {}
self.model_cache_param_hashes = {}
def copy(self):
return Option(self.opt_type, self.S0, self.K, self.dt,
sigma=self.sigma, r=self.r, yield_=self.yield_, exer_type=self.exer_type)
def param_hash(self):
return hash((self.opt_type, self.S0, self.K, self.dt,
self.sigma, self.r, self.yield_, self.exer_type))
@staticmethod
def imply_sigmaatility(premium, max_sigma=0.99, *args, **kwargs):
def obj_fn(sigma_guess):
kwargs['sigma'] = sigma_guess
val = Option(*args, **kwargs).run_model()[OptionMeasure.VALUE]
return val - premium
try:
return scipy.optimize.bisect(obj_fn, 0.01, max_sigma, xtol=0.0025)
except ValueError:
return None
def run_model(self, model=OptionModel.BINOMIAL_TREE, **kwargs):
curr_param_hash = self.param_hash()
if model in self.model_cache:
prev_param_hash = self.model_cache_param_hashes[model]
if self.param_hash() == prev_param_hash:
return self.model_cache[model]
if model == OptionModel.BLACK_SCHOLES:
result = self.black_scholes(**kwargs)
elif model == OptionModel.BINOMIAL_TREE:
result = self.binomial_tree(**kwargs)
else: # model == OptionModel.MONTE_CARLO:
result = self.monte_carlo(**kwargs)
self.model_cache[model] = result
self.model_cache_param_hashes[model] = curr_param_hash
return result
def black_scholes(self):
assert self.exer_type == OptionExerciseType.EUROPEAN, \
"Black-Scholes does not support early exercise"
assert (self.yield_ is None) or is_number(self.yield_), \
"Black-Scholes does not support discrete dividends"
sqrt_dt = self.dt ** 0.5
d1 = (math.log(float(self.S0) / self.K) + (self.r - self.yield_ + 0.5 * self.sigma * self.sigma) * self.dt) / (self.sigma * sqrt_dt)
d2 = d1 - self.sigma * (self.dt ** 0.5)
d1_pdf = scipy.stats.norm.pdf(d1)
riskless_disc = exp(-self.r * self.dt)
yield_disc = exp(-self.yield_ * self.dt)
if self.opt_type == OptionType.CALL:
d1_cdf = scipy.stats.norm.cdf(d1)
d2_cdf = scipy.stats.norm.cdf(d2)
delta = yield_disc * d1_cdf
val = self.S0 * delta - riskless_disc * self.K * d2_cdf
theta = -yield_disc * (self.S0 * d1_pdf * self.sigma) / (2 * sqrt_dt) - \
self.r * self.K * riskless_disc * d2_cdf + \
self.yield_ * self.S0 * yield_disc * d1_cdf
rho = self.K * self.dt * riskless_disc * d2_cdf
else: # opt_type == OptionType.PUT:
neg_d1_cdf = scipy.stats.norm.cdf(-d1)
neg_d2_cdf = scipy.stats.norm.cdf(-d2)
delta = -yield_disc * neg_d1_cdf
val = riskless_disc * self.K * neg_d2_cdf + self.S0 * delta
theta = -yield_disc * (self.S0 * d1_pdf * self.sigma) / (2 * sqrt_dt) + \
self.r * self.K * riskless_disc * neg_d2_cdf - \
self.yield_ * self.S0 * yield_disc * neg_d1_cdf
rho = -self.K * self.dt * riskless_disc * neg_d2_cdf
vega = self.S0 * yield_disc * d1_pdf * sqrt_dt
gamma = yield_disc * (d1_pdf / (self.S0 * self.sigma * sqrt_dt))
return {
OptionMeasure.VALUE: val,
OptionMeasure.DELTA: delta,
OptionMeasure.THETA: theta,
OptionMeasure.RHO: rho,
OptionMeasure.VEGA: vega,
OptionMeasure.GAMMA: gamma,
}
def binomial_tree(self, num_steps=None, sens_degree=2):
val_num_steps = num_steps or DEFAULT_BINOMIAL_TREE_NUM_STEPS
val_cache = {}
dt = float(self.dt) / val_num_steps
up_fact = exp(self.sigma * (dt ** 0.5))
down_fact = 1.0 / up_fact
cont_yield = 0 if hasattr(self.yield_, '__call__') else self.yield_
prob_up = (exp((self.r - cont_yield) * dt) - down_fact) / (up_fact - down_fact)
def divs_pv(n):
if not hasattr(self.yield_, '__call__'): return 0
pv_divs = 0
for step_num in range(n, val_num_steps):
div_t1, div_t2 = dt * step_num, dt * (step_num + 1)
div = self.yield_(div_t1, div_t2)
if is_number(div):
mid_div_t = 0.5 * (div_t1 + div_t2)
pv_divs += exp(-self.r * mid_div_t) * div
return pv_divs
bin_S0 = self.S0 - divs_pv(0)
def spot_price(n, num_ups, num_downs):
return (bin_S0 + divs_pv(n)) * (up_fact ** (num_ups - num_downs))
def node_value(n, num_ups, num_downs):
value_cache_key = (n, num_ups, num_downs)
if value_cache_key not in val_cache:
spot = spot_price(n, num_ups, num_downs)
if self.opt_type == OptionType.CALL:
exer_profit = max(0, spot - self.K)
else: # opt_type == OptionType.PUT:
exer_profit = max(0, self.K - spot)
if n >= val_num_steps:
val = exer_profit
else:
fv = prob_up * node_value(n + 1, num_ups + 1, num_downs) + \
(1 - prob_up) * node_value(n + 1, num_ups, num_downs + 1)
pv = exp(-self.r * dt) * fv
if self.exer_type == OptionExerciseType.AMERICAN:
val = max(pv, exer_profit)
else: # exer_type == OptionExerciseType.EUROPEAN
val = pv
val_cache[value_cache_key] = val
return val_cache[value_cache_key]
val = node_value(0, 0, 0)
delta, theta, rho, vega, gamma = None, None, None, None, None
if sens_degree >= 1:
delta = (node_value(1, 1, 0) - node_value(1, 0, 1)) / (bin_S0 * up_fact - bin_S0 * down_fact)
theta = (node_value(2, 1, 1) - val) / (2 * dt)
rho = self.sensitivity('r', 0.0001,
model=OptionModel.BINOMIAL_TREE, sens_degree=sens_degree - 1)
vega = self.sensitivity('sigma', 0.001,
model=OptionModel.BINOMIAL_TREE, sens_degree=sens_degree - 1)
delta_up = (node_value(2, 2, 0) - node_value(2, 1, 1)) / (bin_S0 * up_fact - bin_S0 * down_fact)
delta_down = (node_value(2, 1, 1) - node_value(2, 0, 2)) / (bin_S0 * up_fact - bin_S0 * down_fact)
gamma = (delta_up - delta_down) / ((bin_S0 * up_fact * up_fact - bin_S0 * down_fact * down_fact) / 2)
return {
OptionMeasure.VALUE: val,
OptionMeasure.DELTA: delta,
OptionMeasure.THETA: theta,
OptionMeasure.RHO: rho,
OptionMeasure.VEGA: vega,
OptionMeasure.GAMMA: gamma,
}
def monte_carlo(self, num_steps=None, num_paths=None, random_seed=None, sens_degree=2):
val_num_steps = num_steps or DEFAULT_MONTE_CARLO_NUM_STEPS
val_num_paths = num_paths or DEFAULT_MONTE_CARLO_NUM_PATHS
val_random_seed = random_seed or int(time.time() * 1000)
np.random.seed(val_random_seed)
dt = self.dt / val_num_steps
sqrt_dt = dt ** 0.5
cont_yield = 0 if hasattr(self.yield_, '__call__') else self.yield_
drift = (self.r - cont_yield - (self.sigma ** 2) / 2) * dt
def cast_spot_paths():
result = np.empty([val_num_paths, val_num_steps])
for path_num in range(val_num_paths):
result1 = np.empty([val_num_steps])
spot = self.S0
result1[0] = spot
for step_num in range(1, val_num_steps):
spot *= exp(drift + self.sigma * np.random.normal() * sqrt_dt)
if hasattr(self.yield_, '__call__'):
div_t1, div_t2 = dt * step_num, dt * (step_num + 1)
div = self.yield_(div_t1, div_t2)
if is_number(div): spot -= div
result1[step_num] = spot
result[path_num] = result1
return result
def step_exercise_profits(spot_paths, step_num):
exer_profits = np.empty([val_num_paths])
for path_num in range(val_num_paths):
spot = spot_paths[path_num, step_num]
if self.opt_type == OptionType.CALL:
exer_profit = max(0, spot - self.K)
else: # opt_type == OptionType.PUT:
exer_profit = max(0, self.K - spot)
exer_profits[path_num] = exer_profit
return exer_profits
if self.exer_type == OptionExerciseType.EUROPEAN:
spot_paths = cast_spot_paths()
exer_profits = step_exercise_profits(spot_paths, -1)
pvs = exp(-self.r * self.dt) * exer_profits
val = pvs.mean()
else:
spot_paths = cast_spot_paths()
cashflow_paths = np.zeros([val_num_paths, val_num_steps])
cashflow_paths[:, val_num_steps - 1] = step_exercise_profits(spot_paths, -1)
def cashflow_path_pv(path_num, from_step_num=0):
from_cashflow_path = cashflow_paths[path_num, from_step_num:]
max_step_num = from_cashflow_path.argmax()
max_cashflow = from_cashflow_path[max_step_num]
return exp(-self.r * (dt * max_step_num)) * max_cashflow
step_models = [None] * val_num_steps
for step_num in range(val_num_steps - 2, -1, -1):
exer_profits = step_exercise_profits(spot_paths, step_num)
xs, ys = [], []
for path_num in range(val_num_paths):
if exer_profits[path_num] <= 0: continue
xs += [spot_paths[path_num, step_num]]
cont_val = cashflow_path_pv(path_num, step_num + 1)
ys += [exp(-self.r * dt) * cont_val]
if len(xs) == 0: continue
basis_xs = [xs, np.multiply(xs, xs), [1] * len(xs)]
[x_coeff, x_sq_coeff, int_coeff] = np.linalg.lstsq(np.transpose(np.array(basis_xs)), ys)[0]
step_models[step_num] = [x_coeff, x_sq_coeff, int_coeff]
for path_num in range(val_num_paths):
exer_profit = exer_profits[path_num]
if exer_profit <= 0: continue
spot = spot_paths[path_num, step_num]
cont_pv = (x_coeff * spot) + (x_sq_coeff * spot * spot) + int_coeff
if exer_profit > cont_pv:
cashflow_paths[path_num, step_num] = exer_profit
cashflow_paths[path_num, (step_num + 1):] = 0
pvs = []
for path_num in range(val_num_paths):
step_num = cashflow_paths[path_num].argmax()
cashflow = cashflow_paths[path_num][step_num]
pvs += [exp(-self.r * (dt * step_num)) * cashflow]
val = np.array(pvs).mean()
delta, theta, rho, vega, gamma = None, None, None, None, None
if sens_degree >= 1:
delta = self.sensitivity('S0', self.S0 * 1.0e-05, model=OptionModel.MONTE_CARLO,
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1)
theta = -self.sensitivity('dt', 0.001, model=OptionModel.MONTE_CARLO,
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1)
rho = self.sensitivity('r', 0.0001, model=OptionModel.MONTE_CARLO,
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1)
vega = self.sensitivity('sigma', 0.001, model=OptionModel.MONTE_CARLO,
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1)
if sens_degree >= 2:
gamma = self.sensitivity('S0', self.S0 * 0.05, opt_measure=OptionMeasure.DELTA, model=OptionModel.MONTE_CARLO,
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1)
return {
OptionMeasure.VALUE: val,
OptionMeasure.DELTA: delta,
OptionMeasure.THETA: theta,
OptionMeasure.RHO: rho,
OptionMeasure.VEGA: vega,
OptionMeasure.GAMMA: gamma,
}
def sensitivity(self, opt_param, opt_param_bump, opt_measure=OptionMeasure.VALUE,
model=OptionModel.BINOMIAL_TREE, **model_kwargs):
up_opt = self.copy()
setattr(up_opt, opt_param, getattr(up_opt, opt_param) + opt_param_bump)
down_opt = self.copy()
setattr(down_opt, opt_param, getattr(down_opt, opt_param) - opt_param_bump)
up_measure = up_opt.run_model(model=model, **model_kwargs)[opt_measure]
down_measure = down_opt.run_model(model=model, **model_kwargs)[opt_measure]
return (up_measure - down_measure) / (2 * opt_param_bump)
def is_number(x):
return (x is not None) and \
(isinstance(x, int) or isinstance(x, float) or isinstance(x, numbers.Integral))
# here you can change arguments
print (Option(opt_type=OptionType.CALL, S0=100, K=95, dt=3.0/12, sigma=0.50, r=0.10, exer_type=OptionExerciseType.EUROPEAN).run_model(model=OptionModel.BLACK_SCHOLES)[OptionMeasure.VALUE])
# expected output is 13.695272738608146
@39mikufans
Copy link
Author

At that time, I still used python2 when I wrote it. Although it doesn’t seem to be verbose now, a lot of grammar seems to be unavailable anymore... I think i will find a time to rewrite it...

@39mikufans
Copy link
Author

Now it can be used, but the way to enter arguments is too cumbersome, i will let it better in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment