Skip to content

Instantly share code, notes, and snippets.

Forked from dat-boris/
Created February 28, 2017 05:18
Show Gist options
  • Save rterbush/983fd849b7254197ce8d4e257d3b67e7 to your computer and use it in GitHub Desktop.
Save rterbush/983fd849b7254197ce8d4e257d3b67e7 to your computer and use it in GitHub Desktop.
Quantopian tool for researching pair trading algorithm
A simple tool for researching Pairs based on:
from datetime import date, timedelta
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import coint
from zipline import TradingAlgorithm
import pyfolio as pf
# CROSS_VALIDATION_BOUNDARY = [[start1, end1]]
LOOKBACK_WINDOW = timedelta(days=60)
class Pair(object):
def __init__(self, s1, s2, validation_segment=0):
self.s1 = s1
self.s2 = s2
self.validation_segment = validation_segment
self.start_date, self.end_date = CROSS_VALIDATION_BOUNDARY[self.validation_segment]
self.pricing = [None] * len(CROSS_VALIDATION_BOUNDARY)
def get_pricing(self):
self.pricing[self.validation_segment] = get_pricing(
[self.s1, self.s2],
start_date=self.start_date - LOOKBACK_WINDOW,
# ensure that we dont fall on holiday
return self
def plot(self):
def coint_test(self, plot=True):
For a good pair, we should able to see a very low co-integration value
(i.e. < 0.05)
pricing_data = self.pricing[self.validation_segment]
X = pricing_data[self.s1]
Y = pricing_data[self.s2]
if plot:
(Y - X).plot() # Plot the spread
plt.axhline((Y - X).mean(), color='red', linestyle='--') # Add the mean
plt.legend(['Price Spread', 'Mean']);
# compute the p-value of the cointegration test
# will inform us as to whether the spread between the 2 timeseries is stationary
# around its mean
score, pvalue, _ = coint(X,Y)
print "Coint ({} to {}) {} vs {}: {}".format(,,
self.s1, self.s2, pvalue)
return pvalue
def test_trading(self):
algo_initialize = get_backtest_algo(self.s1, self.s2)
# see for setup
algo_obj = TradingAlgorithm(
# Run algorithms
pricing_data = self.pricing[self.validation_segment]
results =
pricing_data, #.transpose(2,1,0),
sharpe = (results.returns.mean()*252)/(results.returns.std() * np.sqrt(252))
print "The Sharpe ratio is %0.6f" % sharpe
self.results = results
self.sharpe = sharpe
return results, sharpe
def tearsheet_from_results(self, simple=True):
results = self.results
algo_returns, positions, algo_transactions, gross_lev = pf.utils.extract_rets_pos_txn_from_zipline(results)
if simple:
pf.create_full_tear_sheet(algo_returns, positions=positions,
# test_pair = Pair(*STOCK_PAIRS[1])
# test_pair.get_pricing()
# #test_pair.plot()
# # should see a low value
# #test_pair.coint_test()
# results, sharpe = test_pair.test_trading()
# source from
import numpy as np
import statsmodels.api as sm
import pandas as pd
from zipline.utils import tradingcalendar
import pytz
import itertools
from zipline.api import (
schedule_function, date_rules, time_rules, sid, symbol,
set_slippage, slippage, set_commission, commission,
get_datetime, order_target_percent, record, attach_pipeline,
order_target, get_open_orders, history
def get_backtest_algo(s1, s2):
Setting up the algorithm for real testing
ALGO_STOCK_PAIRS = [[s1,s2]]
#BENCHMARK = symbols('SPY')
#UNIVERSE = list(itertools.chain([BENCHMARK], *ALGO_STOCK_PAIRS))
UNIVERSE = list(itertools.chain(*ALGO_STOCK_PAIRS))
def initialize(context):
# Quantopian backtester specific variables
context.stock_pairs = ALGO_STOCK_PAIRS
context.universe = UNIVERSE
# set_benchmark(context.y)
context.num_pairs = len(context.stock_pairs)
# strategy specific variables
context.lookback = 20 # used for regression
context.z_window = 20 # used for zscore calculation, must be <= lookback
context.spread = np.ndarray((context.num_pairs, 0))
# context.hedgeRatioTS = np.ndarray((context.num_pairs, 0))
context.inLong = [False] * context.num_pairs
context.inShort = [False] * context.num_pairs
# Only do work 30 minutes before close
schedule_function(func=check_pair_status, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(minutes=30))
# Will be called on every trade event for the securities you specify.
def handle_data(context, data):
# Our work is now scheduled in check_pair_status
def check_pair_status(context, data):
if get_open_orders():
prices = data.history(context.universe, 'price', 35, '1d').iloc[-context.lookback::]
new_spreads = np.ndarray((context.num_pairs, 1))
for i in range(context.num_pairs):
(stock_y, stock_x) = context.stock_pairs[i]
Y = prices[stock_y]
X = prices[stock_x]
hedge = hedge_ratio(Y, X, add_const=True)
except ValueError as e:
# context.hedgeRatioTS = np.append(context.hedgeRatioTS, hedge)
new_spreads[i, :] = Y[-1] - hedge * X[-1]
if context.spread.shape[1] > context.z_window:
# Keep only the z-score lookback period
spreads = context.spread[i, -context.z_window:]
zscore = (spreads[-1] - spreads.mean()) / spreads.std()
if context.inShort[i] and zscore < 0.0:
order_target(stock_y, 0)
order_target(stock_x, 0)
context.inShort[i] = False
context.inLong[i] = False
record(X_pct=0, Y_pct=0)
if context.inLong[i] and zscore > 0.0:
order_target(stock_y, 0)
order_target(stock_x, 0)
context.inShort[i] = False
context.inLong[i] = False
record(X_pct=0, Y_pct=0)
if zscore < -1.0 and (not context.inLong[i]):
# Only trade if NOT already in a trade
y_target_shares = 1
X_target_shares = -hedge
context.inLong[i] = True
context.inShort[i] = False
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares,X_target_shares, Y[-1], X[-1] )
order_target_percent( stock_y, y_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) )
order_target_percent( stock_x, x_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) )
record(Y_pct=y_target_pct, X_pct=x_target_pct)
if zscore > 1.0 and (not context.inShort[i]):
# Only trade if NOT already in a trade
y_target_shares = -1
X_target_shares = hedge
context.inShort[i] = True
context.inLong[i] = False
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares, X_target_shares, Y[-1], X[-1] )
order_target_percent( stock_y, y_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) )
order_target_percent( stock_x, x_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) )
record(Y_pct=y_target_pct, X_pct=x_target_pct)
context.spread = np.hstack([context.spread, new_spreads])
def hedge_ratio(Y, X, add_const=True):
if add_const:
X = sm.add_constant(X)
model = sm.OLS(Y, X).fit()
return model.params[1]
model = sm.OLS(Y, X).fit()
return model.params.values
def computeHoldingsPct(yShares, xShares, yPrice, xPrice):
yDol = yShares * yPrice
xDol = xShares * xPrice
notionalDol = abs(yDol) + abs(xDol)
y_target_pct = yDol / notionalDol
x_target_pct = xDol / notionalDol
return (y_target_pct, x_target_pct)
return initialize
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment