Skip to content

Instantly share code, notes, and snippets.

@18182324
Last active February 1, 2023 18:57
Show Gist options
  • Save 18182324/9696633b2a910e59a07f8cecc6edfafe to your computer and use it in GitHub Desktop.
Save 18182324/9696633b2a910e59a07f8cecc6edfafe to your computer and use it in GitHub Desktop.
Pairs Trading S&P 500 Tickers
import pandas as pd
import numpy as np
from pandas_datareader import data as pdr
import statsmodels
from statsmodels.tsa.stattools import coint
import matplotlib.pyplot as plt
# Step 1: Download the stock data from yahoo finance for all stocks in the S&P 500 Index
tickers = pd.read_html("https://en.wikipedia.org/wiki/List_of_S%26P_500_companies")[0]["Symbol"].tolist()
data = pdr.get_data_yahoo(tickers, start="2010-01-01", end="2022-12-31")["Adj Close"]
# Step 2: Test for cointegration
def find_cointegrated_pairs(data):
n = data.shape[1]
score_matrix = np.zeros((n, n))
pvalue_matrix = np.ones((n, n))
keys = data.keys()
pairs = []
for i in range(n):
for j in range(i+1, n):
S1 = data[keys[i]]
S2 = data[keys[j]]
result = coint(S1, S2)
score = result[0]
pvalue = result[1]
score_matrix[i, j] = score
pvalue_matrix[i, j] = pvalue
if pvalue < 0.05:
pairs.append((keys[i], keys[j]))
return score_matrix, pvalue_matrix, pairs
scores, pvalues, pairs = find_cointegrated_pairs(data)
# Step 3: Backtest a pairs trading strategy with a combination of all the 500 tickers and sort the most profitable pairs in descending order
def backtest_pairs_trading(data, pairs):
returns = []
for pair in pairs:
S1 = data[pair[0]]
S2 = data[pair[1]]
spread = S1 - S2
spread_mean = spread.mean()
spread_zscore = (spread - spread_mean) / spread.std()
spread_zscore_mean = spread_zscore.mean()
spread_zscore_std = spread_zscore.std()
spread_zscore = spread_zscore.iloc[-1]
if spread_zscore > 1.0:
# go long on the underpriced asset
returns.append((pair[0], spread_zscore_mean + spread_zscore_std))
elif spread_zscore < -1.0:
# go short on the overpriced asset
returns.append((pair[1], spread_zscore_mean + spread_zscore_std))
returns = pd.DataFrame(returns, columns=["Stock", "Return"])
returns = returns.set_index("Stock")
returns = returns.sort_values("Return", ascending=False)
return returns
...
returns = backtest_pairs_trading(data, pairs)
# Step 4: Visualize the results of the most profitable quantile of pairs
plt.hist(returns["Return"], bins=50)
plt.xlabel("Return")
plt.ylabel("Frequency")
plt.title("Distribution of Returns")
plt.show()
# Step 5: List the 10 pairs that are most profitable
top_pairs = returns.head(10)
print("The 10 most profitable pairs are:")
print(top_pairs)
import pandas as pd
import numpy as np
from pandas_datareader import data as pdr
import statsmodels
from statsmodels.tsa.stattools import coint
import matplotlib.pyplot as plt
# Step 1: Download the stock data from yahoo finance for all stocks in the S&P 500 Index
tickers = pd.read_html("https://en.wikipedia.org/wiki/List_of_S%26P_500_companies")[0]["Symbol"].tolist()
data = pdr.get_data_yahoo(tickers, start="2010-01-01", end="2022-12-31")["Adj Close"]
# Step 2: Test for cointegration
def find_cointegrated_pairs(data):
n = data.shape[1]
score_matrix = np.zeros((n, n))
pvalue_matrix = np.ones((n, n))
keys = data.keys()
pairs = []
for i in range(n):
for j in range(i+1, n):
S1 = data[keys[i]]
S2 = data[keys[j]]
result = coint(S1, S2)
score = result[0]
pvalue = result[1]
score_matrix[i, j] = score
pvalue_matrix[i, j] = pvalue
if pvalue < 0.05:
pairs.append((keys[i], keys[j]))
return score_matrix, pvalue_matrix, pairs
scores, pvalues, pairs = find_cointegrated_pairs(data)
# Step 3: Backtest a pairs trading strategy with a combination of all the 500 tick
def backtest_pairs_trading(data, pairs, spread_score=0):
returns = []
for pair in pairs:
S1 = data[pair[0]]
S2 = data[pair[1]]
spread = S1 - spread_score * S2
spread_mean = spread.mean()
spread_zscore = (spread - spread_mean) / spread.std()
entry_zscore = spread_zscore.mean()
exit_zscore = entry_zscore + 0.5
long_entries = spread_zscore < -entry_zscore
short_entries = spread_zscore > entry_zscore
exits = spread_zscore > exit_zscore
long_exits = spread_zscore > exit_zscore
short_exits = spread_zscore < -exit_zscore
pos = np.zeros(spread.shape[0])
pos[0] = 1 if spread_zscore[0] < -entry_zscore else -1
for i in range(1, spread.shape[0]):
if pos[i-1] == 1:
if long_exits[i]:
pos[i] = 0
elif short_entries[i]:
pos[i] = -1
else:
pos[i] = 1
elif pos[i-1] == -1:
if short_exits[i]:
pos[i] = 0
elif long_entries[i]:
pos[i] = 1
else:
pos[i] = -1
else:
if long_entries[i]:
pos[i] = 1
elif short_entries[i]:
pos[i] = -1
returns.append((pair, np.sum(pos * (S1 - S2) / S2)))
returns = pd.DataFrame(returns, columns=["Pair", "Return"])
returns = returns.sort_values(by="Return", ascending=False)
return returns
returns = backtest_pairs_trading(data, pairs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment