Last active
October 14, 2020 06:23
-
-
Save 39mikufans/9c67e29001c0347f86b978445e0d6b3a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import abc | |
from math import exp | |
import math | |
import numbers | |
import time | |
import numpy as np | |
import scipy.optimize | |
import scipy.stats | |
# Copyright (c) <2020> <TouhouSupport(Ameame)> | |
# "Anti 996" License Version 1.0 (Draft) | |
# Permission is hereby granted to any individual or legal entity | |
# obtaining a copy of this licensed work (including the source code, | |
# documentation and/or related items, hereinafter collectively referred | |
# to as the "licensed work"), free of charge, to deal with the licensed | |
# work for any purpose, including without limitation, the rights to use, | |
# reproduce, modify, prepare derivative works of, distribute, publish | |
# and sublicense the licensed work, subject to the following conditions: | |
# 1. The individual or the legal entity must conspicuously display, | |
# without modification, this License and the notice on each redistributed | |
# or derivative copy of the Licensed Work. | |
# 2. The individual or the legal entity must strictly comply with all | |
# applicable laws, regulations, rules and standards of the jurisdiction | |
# relating to labor and employment where the individual is physically | |
# located or where the individual was born or naturalized; or where the | |
# legal entity is registered or is operating (whichever is stricter). In | |
# case that the jurisdiction has no such laws, regulations, rules and | |
# standards or its laws, regulations, rules and standards are | |
# unenforceable, the individual or the legal entity are required to | |
# comply with Core International Labor Standards. | |
# 3. The individual or the legal entity shall not induce, suggest or force | |
# its employee(s), whether full-time or part-time, or its independent | |
# contractor(s), in any methods, to agree in oral or written form, to | |
# directly or indirectly restrict, weaken or relinquish his or her | |
# rights or remedies under such laws, regulations, rules and standards | |
# relating to labor and employment as mentioned above, no matter whether | |
# such written or oral agreements are enforceable under the laws of the | |
# said jurisdiction, nor shall such individual or the legal entity | |
# limit, in any methods, the rights of its employee(s) or independent | |
# contractor(s) from reporting or complaining to the copyright holder or | |
# relevant authorities monitoring the compliance of the license about | |
# its violation(s) of the said license. | |
# THE LICENSED WORK IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. | |
# IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY CLAIM, | |
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | |
# OTHERWISE, ARISING FROM, OUT OF OR IN ANY WAY CONNECTION WITH THE | |
# LICENSED WORK OR THE USE OR OTHER DEALINGS IN THE LICENSED WORK. | |
def enum(**enums): | |
return type('Enum', (), enums) | |
OptionType = enum(CALL='call', PUT='put') | |
OptionExerciseType = enum(EUROPEAN='european', AMERICAN='american') | |
OptionModel = enum(BLACK_SCHOLES='black_scholes', BINOMIAL_TREE='binomial_tree', MONTE_CARLO='monte_carlo') | |
OptionMeasure = enum(VALUE='value', DELTA='delta', THETA='theta', RHO='rho', VEGA='vega', GAMMA='gamma') | |
# you can change steps to get a more precise | |
DEFAULT_BINOMIAL_TREE_NUM_STEPS = 25 | |
DEFAULT_MONTE_CARLO_NUM_STEPS = 50 | |
DEFAULT_MONTE_CARLO_NUM_PATHS = 100 | |
class Instrument(object): | |
@abc.abstractmethod | |
def run_model(self): | |
"""Calculate measures (i.e. theoretical value & greeks) for this instrument""" | |
class Option(Instrument): | |
def __init__(self, opt_type, S0,K, dt, sigma=None, r=None, yield_=None, exer_type=None): | |
self.opt_type = opt_type | |
self.S0 = S0 | |
self.K = K | |
self.sigma = sigma | |
self.dt = dt | |
self.r = r or 0 | |
self.yield_ = yield_ or 0 | |
self.exer_type = exer_type or OptionExerciseType.AMERICAN | |
self.model_cache = {} | |
self.model_cache_param_hashes = {} | |
def copy(self): | |
return Option(self.opt_type, self.S0, self.K, self.dt, | |
sigma=self.sigma, r=self.r, yield_=self.yield_, exer_type=self.exer_type) | |
def param_hash(self): | |
return hash((self.opt_type, self.S0, self.K, self.dt, | |
self.sigma, self.r, self.yield_, self.exer_type)) | |
@staticmethod | |
def imply_sigmaatility(premium, max_sigma=0.99, *args, **kwargs): | |
def obj_fn(sigma_guess): | |
kwargs['sigma'] = sigma_guess | |
val = Option(*args, **kwargs).run_model()[OptionMeasure.VALUE] | |
return val - premium | |
try: | |
return scipy.optimize.bisect(obj_fn, 0.01, max_sigma, xtol=0.0025) | |
except ValueError: | |
return None | |
def run_model(self, model=OptionModel.BINOMIAL_TREE, **kwargs): | |
curr_param_hash = self.param_hash() | |
if model in self.model_cache: | |
prev_param_hash = self.model_cache_param_hashes[model] | |
if self.param_hash() == prev_param_hash: | |
return self.model_cache[model] | |
if model == OptionModel.BLACK_SCHOLES: | |
result = self.black_scholes(**kwargs) | |
elif model == OptionModel.BINOMIAL_TREE: | |
result = self.binomial_tree(**kwargs) | |
else: # model == OptionModel.MONTE_CARLO: | |
result = self.monte_carlo(**kwargs) | |
self.model_cache[model] = result | |
self.model_cache_param_hashes[model] = curr_param_hash | |
return result | |
def black_scholes(self): | |
assert self.exer_type == OptionExerciseType.EUROPEAN, \ | |
"Black-Scholes does not support early exercise" | |
assert (self.yield_ is None) or is_number(self.yield_), \ | |
"Black-Scholes does not support discrete dividends" | |
sqrt_dt = self.dt ** 0.5 | |
d1 = (math.log(float(self.S0) / self.K) + (self.r - self.yield_ + 0.5 * self.sigma * self.sigma) * self.dt) / (self.sigma * sqrt_dt) | |
d2 = d1 - self.sigma * (self.dt ** 0.5) | |
d1_pdf = scipy.stats.norm.pdf(d1) | |
riskless_disc = exp(-self.r * self.dt) | |
yield_disc = exp(-self.yield_ * self.dt) | |
if self.opt_type == OptionType.CALL: | |
d1_cdf = scipy.stats.norm.cdf(d1) | |
d2_cdf = scipy.stats.norm.cdf(d2) | |
delta = yield_disc * d1_cdf | |
val = self.S0 * delta - riskless_disc * self.K * d2_cdf | |
theta = -yield_disc * (self.S0 * d1_pdf * self.sigma) / (2 * sqrt_dt) - \ | |
self.r * self.K * riskless_disc * d2_cdf + \ | |
self.yield_ * self.S0 * yield_disc * d1_cdf | |
rho = self.K * self.dt * riskless_disc * d2_cdf | |
else: # opt_type == OptionType.PUT: | |
neg_d1_cdf = scipy.stats.norm.cdf(-d1) | |
neg_d2_cdf = scipy.stats.norm.cdf(-d2) | |
delta = -yield_disc * neg_d1_cdf | |
val = riskless_disc * self.K * neg_d2_cdf + self.S0 * delta | |
theta = -yield_disc * (self.S0 * d1_pdf * self.sigma) / (2 * sqrt_dt) + \ | |
self.r * self.K * riskless_disc * neg_d2_cdf - \ | |
self.yield_ * self.S0 * yield_disc * neg_d1_cdf | |
rho = -self.K * self.dt * riskless_disc * neg_d2_cdf | |
vega = self.S0 * yield_disc * d1_pdf * sqrt_dt | |
gamma = yield_disc * (d1_pdf / (self.S0 * self.sigma * sqrt_dt)) | |
return { | |
OptionMeasure.VALUE: val, | |
OptionMeasure.DELTA: delta, | |
OptionMeasure.THETA: theta, | |
OptionMeasure.RHO: rho, | |
OptionMeasure.VEGA: vega, | |
OptionMeasure.GAMMA: gamma, | |
} | |
def binomial_tree(self, num_steps=None, sens_degree=2): | |
val_num_steps = num_steps or DEFAULT_BINOMIAL_TREE_NUM_STEPS | |
val_cache = {} | |
dt = float(self.dt) / val_num_steps | |
up_fact = exp(self.sigma * (dt ** 0.5)) | |
down_fact = 1.0 / up_fact | |
cont_yield = 0 if hasattr(self.yield_, '__call__') else self.yield_ | |
prob_up = (exp((self.r - cont_yield) * dt) - down_fact) / (up_fact - down_fact) | |
def divs_pv(n): | |
if not hasattr(self.yield_, '__call__'): return 0 | |
pv_divs = 0 | |
for step_num in range(n, val_num_steps): | |
div_t1, div_t2 = dt * step_num, dt * (step_num + 1) | |
div = self.yield_(div_t1, div_t2) | |
if is_number(div): | |
mid_div_t = 0.5 * (div_t1 + div_t2) | |
pv_divs += exp(-self.r * mid_div_t) * div | |
return pv_divs | |
bin_S0 = self.S0 - divs_pv(0) | |
def spot_price(n, num_ups, num_downs): | |
return (bin_S0 + divs_pv(n)) * (up_fact ** (num_ups - num_downs)) | |
def node_value(n, num_ups, num_downs): | |
value_cache_key = (n, num_ups, num_downs) | |
if value_cache_key not in val_cache: | |
spot = spot_price(n, num_ups, num_downs) | |
if self.opt_type == OptionType.CALL: | |
exer_profit = max(0, spot - self.K) | |
else: # opt_type == OptionType.PUT: | |
exer_profit = max(0, self.K - spot) | |
if n >= val_num_steps: | |
val = exer_profit | |
else: | |
fv = prob_up * node_value(n + 1, num_ups + 1, num_downs) + \ | |
(1 - prob_up) * node_value(n + 1, num_ups, num_downs + 1) | |
pv = exp(-self.r * dt) * fv | |
if self.exer_type == OptionExerciseType.AMERICAN: | |
val = max(pv, exer_profit) | |
else: # exer_type == OptionExerciseType.EUROPEAN | |
val = pv | |
val_cache[value_cache_key] = val | |
return val_cache[value_cache_key] | |
val = node_value(0, 0, 0) | |
delta, theta, rho, vega, gamma = None, None, None, None, None | |
if sens_degree >= 1: | |
delta = (node_value(1, 1, 0) - node_value(1, 0, 1)) / (bin_S0 * up_fact - bin_S0 * down_fact) | |
theta = (node_value(2, 1, 1) - val) / (2 * dt) | |
rho = self.sensitivity('r', 0.0001, | |
model=OptionModel.BINOMIAL_TREE, sens_degree=sens_degree - 1) | |
vega = self.sensitivity('sigma', 0.001, | |
model=OptionModel.BINOMIAL_TREE, sens_degree=sens_degree - 1) | |
delta_up = (node_value(2, 2, 0) - node_value(2, 1, 1)) / (bin_S0 * up_fact - bin_S0 * down_fact) | |
delta_down = (node_value(2, 1, 1) - node_value(2, 0, 2)) / (bin_S0 * up_fact - bin_S0 * down_fact) | |
gamma = (delta_up - delta_down) / ((bin_S0 * up_fact * up_fact - bin_S0 * down_fact * down_fact) / 2) | |
return { | |
OptionMeasure.VALUE: val, | |
OptionMeasure.DELTA: delta, | |
OptionMeasure.THETA: theta, | |
OptionMeasure.RHO: rho, | |
OptionMeasure.VEGA: vega, | |
OptionMeasure.GAMMA: gamma, | |
} | |
def monte_carlo(self, num_steps=None, num_paths=None, random_seed=None, sens_degree=2): | |
val_num_steps = num_steps or DEFAULT_MONTE_CARLO_NUM_STEPS | |
val_num_paths = num_paths or DEFAULT_MONTE_CARLO_NUM_PATHS | |
val_random_seed = random_seed or int(time.time() * 1000) | |
np.random.seed(val_random_seed) | |
dt = self.dt / val_num_steps | |
sqrt_dt = dt ** 0.5 | |
cont_yield = 0 if hasattr(self.yield_, '__call__') else self.yield_ | |
drift = (self.r - cont_yield - (self.sigma ** 2) / 2) * dt | |
def cast_spot_paths(): | |
result = np.empty([val_num_paths, val_num_steps]) | |
for path_num in range(val_num_paths): | |
result1 = np.empty([val_num_steps]) | |
spot = self.S0 | |
result1[0] = spot | |
for step_num in range(1, val_num_steps): | |
spot *= exp(drift + self.sigma * np.random.normal() * sqrt_dt) | |
if hasattr(self.yield_, '__call__'): | |
div_t1, div_t2 = dt * step_num, dt * (step_num + 1) | |
div = self.yield_(div_t1, div_t2) | |
if is_number(div): spot -= div | |
result1[step_num] = spot | |
result[path_num] = result1 | |
return result | |
def step_exercise_profits(spot_paths, step_num): | |
exer_profits = np.empty([val_num_paths]) | |
for path_num in range(val_num_paths): | |
spot = spot_paths[path_num, step_num] | |
if self.opt_type == OptionType.CALL: | |
exer_profit = max(0, spot - self.K) | |
else: # opt_type == OptionType.PUT: | |
exer_profit = max(0, self.K - spot) | |
exer_profits[path_num] = exer_profit | |
return exer_profits | |
if self.exer_type == OptionExerciseType.EUROPEAN: | |
spot_paths = cast_spot_paths() | |
exer_profits = step_exercise_profits(spot_paths, -1) | |
pvs = exp(-self.r * self.dt) * exer_profits | |
val = pvs.mean() | |
else: | |
spot_paths = cast_spot_paths() | |
cashflow_paths = np.zeros([val_num_paths, val_num_steps]) | |
cashflow_paths[:, val_num_steps - 1] = step_exercise_profits(spot_paths, -1) | |
def cashflow_path_pv(path_num, from_step_num=0): | |
from_cashflow_path = cashflow_paths[path_num, from_step_num:] | |
max_step_num = from_cashflow_path.argmax() | |
max_cashflow = from_cashflow_path[max_step_num] | |
return exp(-self.r * (dt * max_step_num)) * max_cashflow | |
step_models = [None] * val_num_steps | |
for step_num in range(val_num_steps - 2, -1, -1): | |
exer_profits = step_exercise_profits(spot_paths, step_num) | |
xs, ys = [], [] | |
for path_num in range(val_num_paths): | |
if exer_profits[path_num] <= 0: continue | |
xs += [spot_paths[path_num, step_num]] | |
cont_val = cashflow_path_pv(path_num, step_num + 1) | |
ys += [exp(-self.r * dt) * cont_val] | |
if len(xs) == 0: continue | |
basis_xs = [xs, np.multiply(xs, xs), [1] * len(xs)] | |
[x_coeff, x_sq_coeff, int_coeff] = np.linalg.lstsq(np.transpose(np.array(basis_xs)), ys)[0] | |
step_models[step_num] = [x_coeff, x_sq_coeff, int_coeff] | |
for path_num in range(val_num_paths): | |
exer_profit = exer_profits[path_num] | |
if exer_profit <= 0: continue | |
spot = spot_paths[path_num, step_num] | |
cont_pv = (x_coeff * spot) + (x_sq_coeff * spot * spot) + int_coeff | |
if exer_profit > cont_pv: | |
cashflow_paths[path_num, step_num] = exer_profit | |
cashflow_paths[path_num, (step_num + 1):] = 0 | |
pvs = [] | |
for path_num in range(val_num_paths): | |
step_num = cashflow_paths[path_num].argmax() | |
cashflow = cashflow_paths[path_num][step_num] | |
pvs += [exp(-self.r * (dt * step_num)) * cashflow] | |
val = np.array(pvs).mean() | |
delta, theta, rho, vega, gamma = None, None, None, None, None | |
if sens_degree >= 1: | |
delta = self.sensitivity('S0', self.S0 * 1.0e-05, model=OptionModel.MONTE_CARLO, | |
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1) | |
theta = -self.sensitivity('dt', 0.001, model=OptionModel.MONTE_CARLO, | |
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1) | |
rho = self.sensitivity('r', 0.0001, model=OptionModel.MONTE_CARLO, | |
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1) | |
vega = self.sensitivity('sigma', 0.001, model=OptionModel.MONTE_CARLO, | |
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1) | |
if sens_degree >= 2: | |
gamma = self.sensitivity('S0', self.S0 * 0.05, opt_measure=OptionMeasure.DELTA, model=OptionModel.MONTE_CARLO, | |
num_steps=num_steps, num_paths=num_paths, random_seed=random_seed, sens_degree=sens_degree - 1) | |
return { | |
OptionMeasure.VALUE: val, | |
OptionMeasure.DELTA: delta, | |
OptionMeasure.THETA: theta, | |
OptionMeasure.RHO: rho, | |
OptionMeasure.VEGA: vega, | |
OptionMeasure.GAMMA: gamma, | |
} | |
def sensitivity(self, opt_param, opt_param_bump, opt_measure=OptionMeasure.VALUE, | |
model=OptionModel.BINOMIAL_TREE, **model_kwargs): | |
up_opt = self.copy() | |
setattr(up_opt, opt_param, getattr(up_opt, opt_param) + opt_param_bump) | |
down_opt = self.copy() | |
setattr(down_opt, opt_param, getattr(down_opt, opt_param) - opt_param_bump) | |
up_measure = up_opt.run_model(model=model, **model_kwargs)[opt_measure] | |
down_measure = down_opt.run_model(model=model, **model_kwargs)[opt_measure] | |
return (up_measure - down_measure) / (2 * opt_param_bump) | |
def is_number(x): | |
return (x is not None) and \ | |
(isinstance(x, int) or isinstance(x, float) or isinstance(x, numbers.Integral)) | |
# here you can change arguments | |
print (Option(opt_type=OptionType.CALL, S0=100, K=95, dt=3.0/12, sigma=0.50, r=0.10, exer_type=OptionExerciseType.EUROPEAN).run_model(model=OptionModel.BLACK_SCHOLES)[OptionMeasure.VALUE]) | |
# expected output is 13.695272738608146 |
Now it can be used, but the way to enter arguments is too cumbersome, i will let it better in the future.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
At that time, I still used python2 when I wrote it. Although it doesn’t seem to be verbose now, a lot of grammar seems to be unavailable anymore... I think i will find a time to rewrite it...