Created
February 23, 2017 08:15
-
-
Save fanannan/ad04d9e4e4532725e822e017aa3ee032 to your computer and use it in GitHub Desktop.
simple vxx trade simulation by gaussian hmm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pickle | |
import warnings | |
import math | |
from hmmlearn.hmm import GaussianHMM, GMMHMM | |
from matplotlib import cm, pyplot as plt | |
from matplotlib.dates import YearLocator, MonthLocator | |
import numpy as np | |
import pandas as pd | |
import seaborn as sns | |
import pandas_datareader.data as web | |
warnings.filterwarnings("ignore") | |
xiv = web.DataReader('XIV', 'yahoo', '2008-01-01') | |
vxx = web.DataReader('VXX', 'yahoo', '2008-01-01') | |
vix = web.DataReader('^VIX', 'yahoo', '2008-01-01') | |
vxv = web.DataReader('^VXV', 'yahoo', '2008-01-01') | |
spy = web.DataReader('^GSPC', 'yahoo', '2008-01-01') | |
def YangZhang(prices, window=30): | |
log_ho = (prices['High'] / prices['Open']).apply(np.log) | |
log_lo = (prices['Low'] / prices['Open']).apply(np.log) | |
log_co = (prices['Close'] / prices['Open']).apply(np.log) | |
log_oc = (prices['Open'] / prices['Close'].shift(1)).apply(np.log) | |
log_oc_sq = log_oc**2 | |
log_cc = (prices['Close'] / prices['Close'].shift(1)).apply(np.log) | |
log_cc_sq = log_cc**2 | |
rs = log_ho * (log_ho - log_co) + log_lo * (log_lo - log_co) | |
close_vol = log_cc_sq.rolling(window=window,center=False).sum() * (1.0 / (window - 1.0)) | |
open_vol = log_oc_sq.rolling(window=window,center=False).sum() * (1.0 / (window - 1.0)) | |
window_rs = rs.rolling(window=window,center=False).sum() * (1.0 / (window - 1.0)) | |
result = (open_vol + 0.164333 * close_vol + 0.835667 * window_rs).apply(np.sqrt) * math.sqrt(252) | |
result[:window-1] = np.nan | |
return result | |
spy_close = spy["Adj Close"] | |
vxv_close = vxv["Adj Close"] | |
vix_close = vix["Adj Close"] | |
vxx_close = vxx["Adj Close"] | |
xiv_close = xiv["Adj Close"] | |
vix_yzv = YangZhang(vix) | |
vxx_yzv = YangZhang(vxx) | |
vxv_yzv = YangZhang(vxv) | |
contango_close = vix_close/vxv_close | |
spy_change, vix_change, vxv_change, contango_change, vxx_change, xiv_change = [(s.pct_change()+1).apply(np.log) for s in | |
[spy_close, vix_close, vxv_close, contango_close, vxx_close, xiv_close]] | |
spy_close.name, vix_close.name, vxv_close.name, contango_close.name, vxx_close.name, xiv_close.name = \ | |
'spy_close', 'vix_close', 'vxv_close', 'contango_close', 'vxx_close', 'xiv_close' | |
spy_change.name, vix_change.name, vxv_change.name, contango_change.name, vxx_change.name, xiv_change.name = \ | |
'spy_change', 'vix_change', 'vxv_change','contango_change', 'vxx_change', 'xiv_change' | |
vix_yzv.name, vxx_yzv.name = 'vix_yzvolatility', 'vxv_yzvolatility' | |
#df = pd.concat([price, spy_change, vix_change, contango_change, vxx_change], axis=1).dropna() # contango_close | |
#df = pd.concat([price, vix_change, vxv_change, contango_change, vxx_change, xiv_change, vix_yzv,], axis=1).dropna() # contango_close vxx_yzv vxv_yzv | |
df = pd.concat([vix_change, vxv_change, contango_change, vxx_change, xiv_change, ], axis=1).dropna() # contango_close vxx_yzv vxv_yzv | |
def build_models(df, price, num_test, num_models, num_iter=500): | |
df_data = pd.concat([df, price], axis=1).dropna() | |
df_train = df_data[:-num_test] | |
df_test = df_data[-num_test:] | |
s_price_train = df_data[price.name][:-num_test] | |
s_price_test = df_data[price.name][-num_test:] | |
ar_train = df_train.drop([price.name], axis=1).values | |
ar_test = df_test.drop([price.name], axis=1).values | |
# | |
models = list() | |
i = 0 | |
while len(models) <= num_models: | |
i = i+1 | |
print('training {0}th model, {1}model built'.format(i, len(models))) | |
#covariance_type="full" | |
covariance_type="diag" | |
model = GaussianHMM(n_components=2, | |
#model = GMMHMM(n_components=2, n_mix=4, | |
covariance_type=covariance_type, | |
n_iter=num_iter, | |
algorithm='viterbi', # if np.random.rand() > 0.5 else 'map', | |
random_state=np.random.RandomState(i)).fit(ar_train) | |
#fr = np.random.rand() | |
#model.startprob_ = np.array([fr, 1-fr]) | |
#model = GMMHMM(n_components=3, covariance_type=covariance_type, n_iter=5000).fit(d) | |
train_states = model.predict(ar_train) | |
c0 = len([v for v in train_states if v==0]) | |
c1 = len([v for v in train_states if v==1]) | |
print(c0,c1) | |
if c0 > c1: | |
f = 0 | |
elif c1 > c0: | |
f = 1 | |
else: | |
f = -1 | |
if f > -1: | |
models.append((f, model)) | |
return models, ar_train, ar_test, s_price_train, s_price_test | |
def apply_models(models, ar_data): | |
bull_list = list() | |
#bear_list = list() | |
for f, model in models: | |
#states = model.predict(ar_train) | |
probs = model.predict_proba(ar_data) | |
bull_list.append([v[f] for v in probs]) | |
#bear_list.append([v[1-f] for v in probs]) | |
bull_probs = [np.array(bull_list)[:, i].mean() for i in range(len(ar_data))] | |
#bear = [np.array(bear_list)[:, i].mean() for i in range(len(ar_data))] | |
return bull_probs #, bear_probs | |
def draw_status(ax, bull_probs, s_price, threshold, inverse): | |
mask0 = [p > threshold for p in bull_probs] | |
mask1 = [p < 1-threshold for p in bull_probs] | |
ax.plot_date(s_price.index, s_price, ".", linestyle='none', c='black', alpha=0.3) | |
ax.plot_date(s_price.index[mask0], s_price[mask0], ".", linestyle='none', c='red', alpha=0.5) | |
ax.plot_date(s_price.index[mask1], s_price[mask1], ".", linestyle='none', c='blue', alpha=0.5) | |
#ax.set_title("Hidden State") | |
ax.xaxis.set_major_locator(YearLocator()) | |
ax.xaxis.set_minor_locator(MonthLocator()) | |
ax.set_yscale('log') | |
ax.grid(True) | |
# | |
bx = ax.twinx() | |
bx.grid() | |
r = pd.DataFrame(s_price) | |
r['mask_0'] = mask0 | |
r['mask_1'] = mask1 | |
r['return'] = s_price.pct_change().shift(-1) | |
r['return_0'] = r['return']*r['mask_0'] | |
r['return_1'] = r['return']*r['mask_1'] | |
#print(r) | |
if inverse: | |
bx.plot(r.index, (1/(r['return']+1)).cumprod(), color='grey') | |
bx.plot(r.index, (1/(r['return_0']+1)).cumprod(), c='red') | |
bx.plot(r.index, (1/(r['return_1']+1)).cumprod(), c='blue') | |
else: | |
bx.plot(r.index, (r['return']+1).cumprod(), color='grey') | |
bx.plot(r.index, (r['return_0']+1).cumprod(), c='red') | |
bx.plot(r.index, (r['return_1']+1).cumprod(), c='blue') | |
price, price.name = vxx_close, 'vxx_close' | |
#price, price.name = vix_close, 'vix_close' | |
#price, price.name = xiv_close, 'xiv_close' | |
num_test = int(len(price)*0.12) | |
num_models = 5 | |
threshold = 0.5 | |
inverse = price.name in ['vix_close', 'vxv_close', 'vxx_close'] | |
models, ar_train, ar_test, s_price_train, s_price_test = build_models(df, price, num_test, num_models, num_iter=5000) | |
bull_prob_train = apply_models(models, ar_train) | |
bull_prob_test = apply_models(models, ar_test) | |
fig, axs = plt.subplots(2, figsize=(20, 12)) | |
draw_status(axs[0], bull_prob_train, s_price_train, threshold, inverse) | |
draw_status(axs[1], bull_prob_test, s_price_test, threshold, inverse) | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment