Skip to content

Instantly share code, notes, and snippets.

@GrovesD2
Last active April 20, 2023 17:49
Show Gist options
  • Save GrovesD2/f9fb2d7f074500b902caae9360385260 to your computer and use it in GitHub Desktop.
Save GrovesD2/f9fb2d7f074500b902caae9360385260 to your computer and use it in GitHub Desktop.
import time
import numpy as np
import numba as nb
import pandas as pd
import yfinance as yf
import plotly.graph_objects as go
import plotly.io as pio
pio.renderers.default='svg'
from copy import deepcopy
from typing import Tuple
from skopt import gp_minimize
from skopt.utils import use_named_args
from skopt.space import Real, Integer, Categorical
# Price data to optimise the trading strategy over
DF = yf.download('QQQ').reset_index()
# Objective function types, can be:
# - sharpe ratio
# - mean trade
# - median trade
# - compunded result
OBJ_TYPE = 'sharpe ratio'
TRAIN_FROM = '2017-01-01' # This is where the training data begins
TEST_FROM = '2021-01-01' # This is where the training data ends and testing begins
# This enforces a penalty if less than this many trades are made and to prevent
# overfitting to perfect trades
MIN_TRADES = 30
N_ITERS = 100 # This is how many iterations to run of the optimiser
VERBOSE = False # Whether to print runtime info from the optimiser
# Define the search space for the optimisation
SPACE = [
Integer(3, 300, name='bb_length'),
Integer(3, 300, name='mean_length'),
# NOTE: This is the max time we remain in a trade
Integer(2, 50, name='max_trade_length'),
Real(0.2, 5, name='std'),
Categorical(['sma', 'ema'], name='mean_type'),
Categorical(['sma', 'ema'], name='bb_type'),
Categorical(['Open', 'Low', 'High', 'Close'], name='mean_price'),
Categorical(['Open', 'Low', 'High', 'Close'], name='bb_price'),
Categorical(['Open', 'Low', 'High', 'Close'], name='entry_price'),
]
# The benchmark strategy to compare to
BENCHMARK_STRAT = {
'bb_length': 30,
'mean_length': 30,
'max_trade_length': 10,
'std': 1,
'mean_type': 'sma',
'bb_type': 'sma',
'mean_price': 'Close',
'bb_price': 'Close',
'entry_price': 'Close',
}
def get_mean_and_bb(
df: pd.DataFrame,
mean_type: str,
mean_price: str,
mean_length: int,
bb_type: str,
std: float,
bb_price: str,
bb_length: int,
) -> pd.DataFrame:
'''
Add columns to the price dataframe for the mean (or fair price) and the
lower bollinger band to signal a mean reversion trade
'''
if mean_type == 'exp':
df['mean'] = df[mean_price].ewm(mean_length, adjust=False).mean()
else:
df['mean'] = df[mean_price].rolling(mean_length).mean()
if bb_type == 'exp':
df['bb'] = df['mean'] - std*df[bb_price].ewm(bb_length, adjust=False).std()
else:
df['bb'] = df['mean'] - std*df[bb_price].rolling(bb_length).std()
return df
@nb.jit(nopython = True)
def run_backtest(
entry_prices: np.array,
open_prices: np.array,
high_prices: np.array,
bb: np.array,
mean: np.array,
max_trade_length: int,
) -> Tuple[np.array, np.array, np.array]:
'''
Run the backtest - NOTE - any trading decision is actioned on the next
open; this mitigates the possiblity of look-ahead bias in the bollinger
bands and the mean.
'''
holding = False
trade_res = []
bought_on = []
sold_on = []
for day in range(2, open_prices.shape[0]):
if not holding and entry_prices[day-1] <= bb[day-1]:
bought_at = open_prices[day]
holding = True
bought_on.append(day)
trade_length = 1
elif holding:
if high_prices[day] >= mean[day-1]:
trade_res.append(mean[day-1]/bought_at - 1)
holding = False
sold_on.append(day)
elif trade_length == max_trade_length:
trade_res.append(open_prices[day]/bought_at - 1)
holding = False
sold_on.append(day)
else:
trade_length += 1
return np.array(trade_res), np.array(bought_on), np.array(sold_on)
@use_named_args(SPACE)
def objective(
bb_length: int,
mean_length: int,
max_trade_length: int,
std: float,
mean_type: str,
bb_type: str,
mean_price: str,
bb_price: str,
entry_price: str,
):
'''
Find the objective function to minimise. NOTE - any results are usually
negated because the optimiser is a minimiser (i.e. to maximise succes, we
need to minimise negative success)
'''
df = get_mean_and_bb(
deepcopy(DF),
mean_type,
mean_price,
mean_length,
bb_type,
std,
bb_price,
bb_length,
)
df = (
df
.dropna()
.reset_index(drop=True)
)
# Only optimise on the training set so that we can effectively test that
# the optimiser is working
df = df[
(df['Date'] >= TRAIN_FROM)
& (df['Date'] <= TEST_FROM)
]
# Run the backtest to generate the list of trades (res)
res, _, _ = run_backtest(
entry_prices=df[entry_price].values.astype(np.float64),
open_prices=df['Open'].values.astype(np.float64),
high_prices=df['High'].values.astype(np.float64),
bb=df['bb'].values.astype(np.float64),
mean=df['mean'].values.astype(np.float64),
max_trade_length=max_trade_length,
)
# Place the trade values onto a percentage scale [0, 100]
res *= 100
if res.shape[0] > MIN_TRADES:
if OBJ_TYPE == 'sharpe ratio':
res = np.sqrt(255)*np.mean(res)/np.std(res)
elif OBJ_TYPE == 'mean trade':
res = np.mean(res)
elif OBJ_TYPE == 'median trade':
res = np.median(res)
elif OBJ_TYPE == 'compounded result':
invest = 1
for trade in res:
invest = (1+trade/100)*invest
res = invest
else:
# If no trades are made, then return a really high number to penalise
# the optimiser and prevent this combination from ocurring again
res = -1e6
return -res
def evaluate_strategy(
bb_length: int,
mean_length: int,
max_trade_length: int,
std: float,
mean_type: str,
bb_type: str,
mean_price: str,
bb_price: str,
entry_price: str,
) -> Tuple[dict, dict]:
'''
Evaluate a strategy for testing purposes. This is pretty much a copy of the
function used in the optimiser.
'''
df = get_mean_and_bb(
deepcopy(DF),
mean_type,
mean_price,
mean_length,
bb_type,
std,
bb_price,
bb_length,
)
df = df.dropna()
# Only optimise on the training set
df_train = (
df[
(df['Date'] >= TRAIN_FROM)
& (df['Date'] <= TEST_FROM)
]
.reset_index(drop=True)
)
df_test = (
df[df['Date'] > TEST_FROM]
.reset_index(drop=True)
)
# Run the backtest for the training portion of the data
res, bought_on, sold_on = run_backtest(
entry_prices=df_train[entry_price].values.astype(np.float64),
open_prices=df_train['Open'].values.astype(np.float64),
high_prices=df_train['High'].values.astype(np.float64),
bb=df_train['bb'].values.astype(np.float64),
mean=df_train['mean'].values.astype(np.float64),
max_trade_length=max_trade_length,
)
# Store the results in a dictionary for output
train = {
'df': df_train,
'res': res,
'bought_on': bought_on,
'sold_on': sold_on,
}
# Run the backtest for the testing portion of the data
res, bought_on, sold_on = run_backtest(
entry_prices=df_test[entry_price].values.astype(np.float64),
open_prices=df_test['Open'].values.astype(np.float64),
high_prices=df_test['High'].values.astype(np.float64),
bb=df_test['bb'].values.astype(np.float64),
mean=df_test['mean'].values.astype(np.float64),
max_trade_length=max_trade_length,
)
# Store the results in a dictionary for output
test = {
'df': df_test,
'res': res,
'bought_on': bought_on,
'sold_on': sold_on,
}
return train, test
def get_equity_curve(case: dict) -> Tuple[np.array, np.array]:
'''
Generate the equity curve from the backtest, and output the date and the
equity at that point in time (for plotting)
'''
# Determine the compounded growth
equity = [1]
for trade in case['res']:
equity.append((1+trade)*equity[-1])
# Join the equity information back onto the df so we can backfill and get
# a smoother equity curve
df = case['df']
equity_dates = np.concatenate([
np.array([df['Date'].values[1]]),
df['Date'].values[case['sold_on']],
])
equity_df = pd.DataFrame({
'Date': equity_dates,
'equity': equity,
})
df = df.merge(equity_df, on='Date', how='left')
df = df.fillna(method='bfill')
df = df.fillna(method='ffill')
return df['Date'].values, df['equity'].values
def plot_equity_curve(
optimised: dict,
benchmark: dict,
title: str
):
'''
Output the equity curve figure, to compare the optimised and benchmark
'''
dates, equity = get_equity_curve(optimised)
b_dates, b_equity = get_equity_curve(benchmark)
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=b_dates,
y=b_equity,
name='Benchmark Strategy'
)
)
fig.add_trace(
go.Scatter(
x=dates,
y=equity,
name='Optimised Strategy'
)
)
fig.update_layout(
yaxis={'title': 'Multiple from Initial Investment'},
xaxis={'title': 'Date'},
legend={'x': 0, 'y': -0.1, 'orientation': 'h'},
margin={'l': 50, 'r': 50, 'b': 50, 't': 25},
width=600,
height=600,
title=title,
)
fig.show()
return
def print_stats(trades: np.array):
'''
Print some sample statistics to show if the optimiser is working over
the benchmark strategy
'''
invest = 1
for trade in trades:
invest = (1+trade)*invest
print('- Mean trade:', np.mean(trades))
print('- Median trade:', np.median(trades))
print('- Sharpe ratio:', np.sqrt(255)*np.mean(trades)/np.std(trades))
print('- Compounded return:', invest)
return
if __name__ == '__main__':
t0 = time.time()
print('Starting the optimisation\n\n')
result = gp_minimize(
objective,
SPACE,
n_calls=N_ITERS,
random_state=0,
verbose=VERBOSE,
n_jobs=-1,
)
optim_params = {
list(BENCHMARK_STRAT.keys())[n]: result.x[n]
for n in range(0, len(BENCHMARK_STRAT))
}
b_train, b_test = evaluate_strategy(*BENCHMARK_STRAT.values())
train, test = evaluate_strategy(*result.x)
print('Training Set\n------------\n')
print('Benchmark')
print_stats(b_train['res'])
print('\nOptimised')
print_stats(train['res'])
print('\n\nTesting Set\n-----------\n')
print('Benchmark')
print_stats(b_test['res'])
print('\nOptimised')
print_stats(test['res'])
plot_equity_curve(train, b_train, 'Training Equity Curve')
plot_equity_curve(test, b_test, 'Testing Equity Curve')
print('\nOptimised parameters:', optim_params)
print('\nTotal time taken:', time.time()-t0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment