Last active
April 20, 2023 17:49
-
-
Save GrovesD2/f9fb2d7f074500b902caae9360385260 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import numpy as np | |
import numba as nb | |
import pandas as pd | |
import yfinance as yf | |
import plotly.graph_objects as go | |
import plotly.io as pio | |
pio.renderers.default='svg' | |
from copy import deepcopy | |
from typing import Tuple | |
from skopt import gp_minimize | |
from skopt.utils import use_named_args | |
from skopt.space import Real, Integer, Categorical | |
# Price data to optimise the trading strategy over | |
DF = yf.download('QQQ').reset_index() | |
# Objective function types, can be: | |
# - sharpe ratio | |
# - mean trade | |
# - median trade | |
# - compunded result | |
OBJ_TYPE = 'sharpe ratio' | |
TRAIN_FROM = '2017-01-01' # This is where the training data begins | |
TEST_FROM = '2021-01-01' # This is where the training data ends and testing begins | |
# This enforces a penalty if less than this many trades are made and to prevent | |
# overfitting to perfect trades | |
MIN_TRADES = 30 | |
N_ITERS = 100 # This is how many iterations to run of the optimiser | |
VERBOSE = False # Whether to print runtime info from the optimiser | |
# Define the search space for the optimisation | |
SPACE = [ | |
Integer(3, 300, name='bb_length'), | |
Integer(3, 300, name='mean_length'), | |
# NOTE: This is the max time we remain in a trade | |
Integer(2, 50, name='max_trade_length'), | |
Real(0.2, 5, name='std'), | |
Categorical(['sma', 'ema'], name='mean_type'), | |
Categorical(['sma', 'ema'], name='bb_type'), | |
Categorical(['Open', 'Low', 'High', 'Close'], name='mean_price'), | |
Categorical(['Open', 'Low', 'High', 'Close'], name='bb_price'), | |
Categorical(['Open', 'Low', 'High', 'Close'], name='entry_price'), | |
] | |
# The benchmark strategy to compare to | |
BENCHMARK_STRAT = { | |
'bb_length': 30, | |
'mean_length': 30, | |
'max_trade_length': 10, | |
'std': 1, | |
'mean_type': 'sma', | |
'bb_type': 'sma', | |
'mean_price': 'Close', | |
'bb_price': 'Close', | |
'entry_price': 'Close', | |
} | |
def get_mean_and_bb( | |
df: pd.DataFrame, | |
mean_type: str, | |
mean_price: str, | |
mean_length: int, | |
bb_type: str, | |
std: float, | |
bb_price: str, | |
bb_length: int, | |
) -> pd.DataFrame: | |
''' | |
Add columns to the price dataframe for the mean (or fair price) and the | |
lower bollinger band to signal a mean reversion trade | |
''' | |
if mean_type == 'exp': | |
df['mean'] = df[mean_price].ewm(mean_length, adjust=False).mean() | |
else: | |
df['mean'] = df[mean_price].rolling(mean_length).mean() | |
if bb_type == 'exp': | |
df['bb'] = df['mean'] - std*df[bb_price].ewm(bb_length, adjust=False).std() | |
else: | |
df['bb'] = df['mean'] - std*df[bb_price].rolling(bb_length).std() | |
return df | |
@nb.jit(nopython = True) | |
def run_backtest( | |
entry_prices: np.array, | |
open_prices: np.array, | |
high_prices: np.array, | |
bb: np.array, | |
mean: np.array, | |
max_trade_length: int, | |
) -> Tuple[np.array, np.array, np.array]: | |
''' | |
Run the backtest - NOTE - any trading decision is actioned on the next | |
open; this mitigates the possiblity of look-ahead bias in the bollinger | |
bands and the mean. | |
''' | |
holding = False | |
trade_res = [] | |
bought_on = [] | |
sold_on = [] | |
for day in range(2, open_prices.shape[0]): | |
if not holding and entry_prices[day-1] <= bb[day-1]: | |
bought_at = open_prices[day] | |
holding = True | |
bought_on.append(day) | |
trade_length = 1 | |
elif holding: | |
if high_prices[day] >= mean[day-1]: | |
trade_res.append(mean[day-1]/bought_at - 1) | |
holding = False | |
sold_on.append(day) | |
elif trade_length == max_trade_length: | |
trade_res.append(open_prices[day]/bought_at - 1) | |
holding = False | |
sold_on.append(day) | |
else: | |
trade_length += 1 | |
return np.array(trade_res), np.array(bought_on), np.array(sold_on) | |
@use_named_args(SPACE) | |
def objective( | |
bb_length: int, | |
mean_length: int, | |
max_trade_length: int, | |
std: float, | |
mean_type: str, | |
bb_type: str, | |
mean_price: str, | |
bb_price: str, | |
entry_price: str, | |
): | |
''' | |
Find the objective function to minimise. NOTE - any results are usually | |
negated because the optimiser is a minimiser (i.e. to maximise succes, we | |
need to minimise negative success) | |
''' | |
df = get_mean_and_bb( | |
deepcopy(DF), | |
mean_type, | |
mean_price, | |
mean_length, | |
bb_type, | |
std, | |
bb_price, | |
bb_length, | |
) | |
df = ( | |
df | |
.dropna() | |
.reset_index(drop=True) | |
) | |
# Only optimise on the training set so that we can effectively test that | |
# the optimiser is working | |
df = df[ | |
(df['Date'] >= TRAIN_FROM) | |
& (df['Date'] <= TEST_FROM) | |
] | |
# Run the backtest to generate the list of trades (res) | |
res, _, _ = run_backtest( | |
entry_prices=df[entry_price].values.astype(np.float64), | |
open_prices=df['Open'].values.astype(np.float64), | |
high_prices=df['High'].values.astype(np.float64), | |
bb=df['bb'].values.astype(np.float64), | |
mean=df['mean'].values.astype(np.float64), | |
max_trade_length=max_trade_length, | |
) | |
# Place the trade values onto a percentage scale [0, 100] | |
res *= 100 | |
if res.shape[0] > MIN_TRADES: | |
if OBJ_TYPE == 'sharpe ratio': | |
res = np.sqrt(255)*np.mean(res)/np.std(res) | |
elif OBJ_TYPE == 'mean trade': | |
res = np.mean(res) | |
elif OBJ_TYPE == 'median trade': | |
res = np.median(res) | |
elif OBJ_TYPE == 'compounded result': | |
invest = 1 | |
for trade in res: | |
invest = (1+trade/100)*invest | |
res = invest | |
else: | |
# If no trades are made, then return a really high number to penalise | |
# the optimiser and prevent this combination from ocurring again | |
res = -1e6 | |
return -res | |
def evaluate_strategy( | |
bb_length: int, | |
mean_length: int, | |
max_trade_length: int, | |
std: float, | |
mean_type: str, | |
bb_type: str, | |
mean_price: str, | |
bb_price: str, | |
entry_price: str, | |
) -> Tuple[dict, dict]: | |
''' | |
Evaluate a strategy for testing purposes. This is pretty much a copy of the | |
function used in the optimiser. | |
''' | |
df = get_mean_and_bb( | |
deepcopy(DF), | |
mean_type, | |
mean_price, | |
mean_length, | |
bb_type, | |
std, | |
bb_price, | |
bb_length, | |
) | |
df = df.dropna() | |
# Only optimise on the training set | |
df_train = ( | |
df[ | |
(df['Date'] >= TRAIN_FROM) | |
& (df['Date'] <= TEST_FROM) | |
] | |
.reset_index(drop=True) | |
) | |
df_test = ( | |
df[df['Date'] > TEST_FROM] | |
.reset_index(drop=True) | |
) | |
# Run the backtest for the training portion of the data | |
res, bought_on, sold_on = run_backtest( | |
entry_prices=df_train[entry_price].values.astype(np.float64), | |
open_prices=df_train['Open'].values.astype(np.float64), | |
high_prices=df_train['High'].values.astype(np.float64), | |
bb=df_train['bb'].values.astype(np.float64), | |
mean=df_train['mean'].values.astype(np.float64), | |
max_trade_length=max_trade_length, | |
) | |
# Store the results in a dictionary for output | |
train = { | |
'df': df_train, | |
'res': res, | |
'bought_on': bought_on, | |
'sold_on': sold_on, | |
} | |
# Run the backtest for the testing portion of the data | |
res, bought_on, sold_on = run_backtest( | |
entry_prices=df_test[entry_price].values.astype(np.float64), | |
open_prices=df_test['Open'].values.astype(np.float64), | |
high_prices=df_test['High'].values.astype(np.float64), | |
bb=df_test['bb'].values.astype(np.float64), | |
mean=df_test['mean'].values.astype(np.float64), | |
max_trade_length=max_trade_length, | |
) | |
# Store the results in a dictionary for output | |
test = { | |
'df': df_test, | |
'res': res, | |
'bought_on': bought_on, | |
'sold_on': sold_on, | |
} | |
return train, test | |
def get_equity_curve(case: dict) -> Tuple[np.array, np.array]: | |
''' | |
Generate the equity curve from the backtest, and output the date and the | |
equity at that point in time (for plotting) | |
''' | |
# Determine the compounded growth | |
equity = [1] | |
for trade in case['res']: | |
equity.append((1+trade)*equity[-1]) | |
# Join the equity information back onto the df so we can backfill and get | |
# a smoother equity curve | |
df = case['df'] | |
equity_dates = np.concatenate([ | |
np.array([df['Date'].values[1]]), | |
df['Date'].values[case['sold_on']], | |
]) | |
equity_df = pd.DataFrame({ | |
'Date': equity_dates, | |
'equity': equity, | |
}) | |
df = df.merge(equity_df, on='Date', how='left') | |
df = df.fillna(method='bfill') | |
df = df.fillna(method='ffill') | |
return df['Date'].values, df['equity'].values | |
def plot_equity_curve( | |
optimised: dict, | |
benchmark: dict, | |
title: str | |
): | |
''' | |
Output the equity curve figure, to compare the optimised and benchmark | |
''' | |
dates, equity = get_equity_curve(optimised) | |
b_dates, b_equity = get_equity_curve(benchmark) | |
fig = go.Figure() | |
fig.add_trace( | |
go.Scatter( | |
x=b_dates, | |
y=b_equity, | |
name='Benchmark Strategy' | |
) | |
) | |
fig.add_trace( | |
go.Scatter( | |
x=dates, | |
y=equity, | |
name='Optimised Strategy' | |
) | |
) | |
fig.update_layout( | |
yaxis={'title': 'Multiple from Initial Investment'}, | |
xaxis={'title': 'Date'}, | |
legend={'x': 0, 'y': -0.1, 'orientation': 'h'}, | |
margin={'l': 50, 'r': 50, 'b': 50, 't': 25}, | |
width=600, | |
height=600, | |
title=title, | |
) | |
fig.show() | |
return | |
def print_stats(trades: np.array): | |
''' | |
Print some sample statistics to show if the optimiser is working over | |
the benchmark strategy | |
''' | |
invest = 1 | |
for trade in trades: | |
invest = (1+trade)*invest | |
print('- Mean trade:', np.mean(trades)) | |
print('- Median trade:', np.median(trades)) | |
print('- Sharpe ratio:', np.sqrt(255)*np.mean(trades)/np.std(trades)) | |
print('- Compounded return:', invest) | |
return | |
if __name__ == '__main__': | |
t0 = time.time() | |
print('Starting the optimisation\n\n') | |
result = gp_minimize( | |
objective, | |
SPACE, | |
n_calls=N_ITERS, | |
random_state=0, | |
verbose=VERBOSE, | |
n_jobs=-1, | |
) | |
optim_params = { | |
list(BENCHMARK_STRAT.keys())[n]: result.x[n] | |
for n in range(0, len(BENCHMARK_STRAT)) | |
} | |
b_train, b_test = evaluate_strategy(*BENCHMARK_STRAT.values()) | |
train, test = evaluate_strategy(*result.x) | |
print('Training Set\n------------\n') | |
print('Benchmark') | |
print_stats(b_train['res']) | |
print('\nOptimised') | |
print_stats(train['res']) | |
print('\n\nTesting Set\n-----------\n') | |
print('Benchmark') | |
print_stats(b_test['res']) | |
print('\nOptimised') | |
print_stats(test['res']) | |
plot_equity_curve(train, b_train, 'Training Equity Curve') | |
plot_equity_curve(test, b_test, 'Testing Equity Curve') | |
print('\nOptimised parameters:', optim_params) | |
print('\nTotal time taken:', time.time()-t0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment