Skip to content

Instantly share code, notes, and snippets.

@dat-boris
Last active February 28, 2017 05:18
Show Gist options
  • Save dat-boris/75c7006f8c103a60aed35ed880860f9d to your computer and use it in GitHub Desktop.
Save dat-boris/75c7006f8c103a60aed35ed880860f9d to your computer and use it in GitHub Desktop.
Quantopian tool for researching pair trading algorithm
"""
A simple tool for researching Pairs based on:
https://www.quantopian.com/clone_notebook?id=57ed7c41144f8837290000da
"""
from datetime import date, timedelta
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import coint
from zipline import TradingAlgorithm
import pyfolio as pf
# CROSS_VALIDATION_BOUNDARY = [[start1, end1]]
# STOCK_PAIRS = []
LOOKBACK_WINDOW = timedelta(days=60)
class Pair(object):
def __init__(self, s1, s2, validation_segment=0):
self.s1 = s1
self.s2 = s2
self.validation_segment = validation_segment
self.start_date, self.end_date = CROSS_VALIDATION_BOUNDARY[self.validation_segment]
self.pricing = [None] * len(CROSS_VALIDATION_BOUNDARY)
def get_pricing(self):
self.pricing[self.validation_segment] = get_pricing(
[self.s1, self.s2],
fields=PRICE_USED,
start_date=self.start_date - LOOKBACK_WINDOW,
# ensure that we dont fall on holiday
end_date=self.end_date
).fillna(method='backfill')
return self
def plot(self):
self.pricing[self.validation_segment].plot()
def coint_test(self, plot=True):
"""
For a good pair, we should able to see a very low co-integration value
(i.e. < 0.05)
"""
pricing_data = self.pricing[self.validation_segment]
X = pricing_data[self.s1]
Y = pricing_data[self.s2]
if plot:
(Y - X).plot() # Plot the spread
plt.axhline((Y - X).mean(), color='red', linestyle='--') # Add the mean
plt.xlabel('Time')
plt.legend(['Price Spread', 'Mean']);
# compute the p-value of the cointegration test
# will inform us as to whether the spread between the 2 timeseries is stationary
# around its mean
score, pvalue, _ = coint(X,Y)
print "Coint ({} to {}) {} vs {}: {}".format(
self.start_date.date(), self.end_date.date(),
self.s1, self.s2, pvalue)
return pvalue
def test_trading(self):
algo_initialize = get_backtest_algo(self.s1, self.s2)
# see https://www.quantopian.com/research/notebooks/201609-pead-reversion/Value-v-Glamour%20stock.ipynb for setup
#http://www.zipline.io/appendix.html
algo_obj = TradingAlgorithm(
initialize=algo_initialize,
#before_trading_start=check_pair_status,
start=self.start_date,
data_frequency='daily',
end=self.end_date,
)
# Run algorithms
pricing_data = self.pricing[self.validation_segment]
results = algo_obj.run(
pricing_data, #.transpose(2,1,0),
overwrite_sim_params=False
)
sharpe = (results.returns.mean()*252)/(results.returns.std() * np.sqrt(252))
print "The Sharpe ratio is %0.6f" % sharpe
self.results = results
self.sharpe = sharpe
return results, sharpe
def tearsheet_from_results(self, simple=True):
results = self.results
algo_returns, positions, algo_transactions, gross_lev = pf.utils.extract_rets_pos_txn_from_zipline(results)
if simple:
pf.create_returns_tear_sheet(algo_returns)
else:
pf.create_full_tear_sheet(algo_returns, positions=positions,
transactions=algo_transactions,
gross_lev=gross_lev
)
# test_pair = Pair(*STOCK_PAIRS[1])
# test_pair.get_pricing()
# #test_pair.plot()
# # should see a low value
# #test_pair.coint_test()
# results, sharpe = test_pair.test_trading()
# source from https://www.quantopian.com/lectures#Example:-Pairs-Trading-Algorithm
import numpy as np
import statsmodels.api as sm
import pandas as pd
from zipline.utils import tradingcalendar
import pytz
import itertools
from zipline.api import (
schedule_function, date_rules, time_rules, sid, symbol,
set_slippage, slippage, set_commission, commission,
get_datetime, order_target_percent, record, attach_pipeline,
order_target, get_open_orders, history
)
def get_backtest_algo(s1, s2):
"""
Setting up the algorithm for real testing
"""
ALGO_STOCK_PAIRS = [[s1,s2]]
#BENCHMARK = symbols('SPY')
#UNIVERSE = list(itertools.chain([BENCHMARK], *ALGO_STOCK_PAIRS))
UNIVERSE = list(itertools.chain(*ALGO_STOCK_PAIRS))
def initialize(context):
# Quantopian backtester specific variables
#set_slippage(slippage.FixedSlippage(spread=0))
#set_commission(commission.PerTrade(cost=1))
#set_symbol_lookup_date('2014-01-01')
context.stock_pairs = ALGO_STOCK_PAIRS
context.universe = UNIVERSE
# set_benchmark(context.y)
context.num_pairs = len(context.stock_pairs)
# strategy specific variables
context.lookback = 20 # used for regression
context.z_window = 20 # used for zscore calculation, must be <= lookback
context.spread = np.ndarray((context.num_pairs, 0))
# context.hedgeRatioTS = np.ndarray((context.num_pairs, 0))
context.inLong = [False] * context.num_pairs
context.inShort = [False] * context.num_pairs
# Only do work 30 minutes before close
schedule_function(func=check_pair_status, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(minutes=30))
# Will be called on every trade event for the securities you specify.
def handle_data(context, data):
# Our work is now scheduled in check_pair_status
pass
def check_pair_status(context, data):
if get_open_orders():
return
prices = data.history(context.universe, 'price', 35, '1d').iloc[-context.lookback::]
new_spreads = np.ndarray((context.num_pairs, 1))
for i in range(context.num_pairs):
(stock_y, stock_x) = context.stock_pairs[i]
Y = prices[stock_y]
X = prices[stock_x]
try:
hedge = hedge_ratio(Y, X, add_const=True)
except ValueError as e:
log.debug(e)
return
# context.hedgeRatioTS = np.append(context.hedgeRatioTS, hedge)
new_spreads[i, :] = Y[-1] - hedge * X[-1]
if context.spread.shape[1] > context.z_window:
# Keep only the z-score lookback period
spreads = context.spread[i, -context.z_window:]
zscore = (spreads[-1] - spreads.mean()) / spreads.std()
if context.inShort[i] and zscore < 0.0:
order_target(stock_y, 0)
order_target(stock_x, 0)
context.inShort[i] = False
context.inLong[i] = False
record(X_pct=0, Y_pct=0)
return
if context.inLong[i] and zscore > 0.0:
order_target(stock_y, 0)
order_target(stock_x, 0)
context.inShort[i] = False
context.inLong[i] = False
record(X_pct=0, Y_pct=0)
return
if zscore < -1.0 and (not context.inLong[i]):
# Only trade if NOT already in a trade
y_target_shares = 1
X_target_shares = -hedge
context.inLong[i] = True
context.inShort[i] = False
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares,X_target_shares, Y[-1], X[-1] )
order_target_percent( stock_y, y_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) )
order_target_percent( stock_x, x_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) )
record(Y_pct=y_target_pct, X_pct=x_target_pct)
return
if zscore > 1.0 and (not context.inShort[i]):
# Only trade if NOT already in a trade
y_target_shares = -1
X_target_shares = hedge
context.inShort[i] = True
context.inLong[i] = False
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares, X_target_shares, Y[-1], X[-1] )
order_target_percent( stock_y, y_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) )
order_target_percent( stock_x, x_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) )
record(Y_pct=y_target_pct, X_pct=x_target_pct)
context.spread = np.hstack([context.spread, new_spreads])
def hedge_ratio(Y, X, add_const=True):
if add_const:
X = sm.add_constant(X)
model = sm.OLS(Y, X).fit()
return model.params[1]
model = sm.OLS(Y, X).fit()
return model.params.values
def computeHoldingsPct(yShares, xShares, yPrice, xPrice):
yDol = yShares * yPrice
xDol = xShares * xPrice
notionalDol = abs(yDol) + abs(xDol)
y_target_pct = yDol / notionalDol
x_target_pct = xDol / notionalDol
return (y_target_pct, x_target_pct)
return initialize
#get_backtest_algo(*STOCK_PAIRS[0])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment