Skip to content

Instantly share code, notes, and snippets.

@GrovesD2
Last active November 26, 2022 18:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GrovesD2/f48f6f9b2af0bf498fc8cb62879c7359 to your computer and use it in GitHub Desktop.
Save GrovesD2/f48f6f9b2af0bf498fc8cb62879c7359 to your computer and use it in GitHub Desktop.
import time
import random
import numba as nb
import numpy as np
import pandas as pd
import datetime as dt
from tqdm import tqdm
from copy import deepcopy
# For type hinting
from typing import Tuple
DATA_DIR = 'PATH_TO_DATA'
# Global config variables
TRAINING_TICKERS = ['AAPL', 'MSFT', 'AMZN', 'GOOGL', 'BRK-B', 'GOOG', 'TSLA']
TESTING_TICKERS = ['SPY']
START_TRADING = '2017-01-01' # Date to start trading on
NUM_STRATS = 35 # Number of strategies to try on each evolution
NUM_EVOLVE = 100 # Number of evolutions to perform
KEEP_PERC = 0.3 # Percentage of top models to keep on each evolution
PRINT_EVL_PROG = False # Whether to print out the results for each evolution
DISABLE_PROGRESS_BAR = False # Whether to disable the GA progress bar
# This is how the fitness is evaluated over a single ticker, e.g. if the
# strategy produces 20 trades, then we may take the mean percentage gained
# over each trade.
# Implemented types: 'mean', 'median', 'compounded'
# Note: 'compounded' means how many multiples of your money would make by
# using the strategy on a single ticker.
STRAT_EVAL = 'compounded'
# This is the metric to take from the fitness values of all tickers, e.g. if
# choosing 'min', you select the min fitness over all tickers; this then
# becomes the strategy's fitness.
# Implemented types: 'min', 'mean', 'median'
FITNESS_TYPE = 'min'
# A minimum of x trades are taken per ticker to prevent overfitting, if you
# select 1, then the strategy will find the perfect combination for that
# ticker (but clearly this is not generalisable to other tickers)
MIN_TRADES = 3
# These are the number of weeks to use for the training and testing data, i.e.
# TRAIN_OVER = 12 and TEST_OVER = 6 means we take 12 weeks to optimise the
# hyperparameters over, and then test them on the next 6 weeks
TRAIN_OVER = 12
TEST_OVER = 6
# Strategy parameters to choose from
MA_TYPES = ['simple', 'exponential'] # Types of moving averages to consider
MA_FIELDS = ['Open', 'Low', 'High', 'Close'] # Price fields to choose from
LOWER_MA_LENGTH = 3 # The least length a moving average can have
UPPER_MA_LENGTH = 300 # The maximum length a moving average can
MAX_PERTURB = 10 # The maximum number to perturb the strategy parameters with
# The strategy to intially perturb, and also to use as a benchmark during
# the testing phase
STARTING_STRAT = {
'fast_ma_type': 'simple',
'slow_ma_type': 'simple',
'fast_ma_field': 'Close',
'slow_ma_field': 'Close',
'fast_ma_length': 10,
'slow_ma_length': 20,
}
def get_random_strat() -> dict:
'''
Generate a fresh random strategy by randomly selecting from the parameters
definied in the global variables.
'''
return check_strat({
'fast_ma_type': random.choice(MA_TYPES),
'slow_ma_type': random.choice(MA_TYPES),
'fast_ma_field': random.choice(MA_FIELDS),
'slow_ma_field': random.choice(MA_FIELDS),
'fast_ma_length': random.randint(LOWER_MA_LENGTH, UPPER_MA_LENGTH),
'slow_ma_length': random.randint(LOWER_MA_LENGTH, UPPER_MA_LENGTH),
})
def check_strat(strat: dict) -> dict:
'''
This checks if the strategy has valid parameters, and adjusts if not. For
example, if the slower moving average has a smaller length than the faster
one, then this is changed to having a larger value.
'''
for ma_type in ['slow', 'fast']:
if strat[ma_type + '_ma_length'] < LOWER_MA_LENGTH:
strat[ma_type + '_ma_length'] = LOWER_MA_LENGTH
elif strat[ma_type + '_ma_length'] > UPPER_MA_LENGTH:
strat[ma_type + '_ma_length'] = UPPER_MA_LENGTH
if strat['slow_ma_length'] <= strat['fast_ma_length']:
strat['slow_ma_length'] = strat['fast_ma_length'] + 1
return strat
def perturb_strat(strat: dict) -> dict:
'''
Perturb the parameters of the strategy slightly to generate a new strategy
'''
for ma_type in ['slow', 'fast']:
strat[ma_type + '_ma_type'] = random.choice(MA_TYPES)
strat[ma_type + '_ma_field'] = random.choice(MA_FIELDS)
strat[ma_type + '_ma_length'] += (
np.random.randint(-MAX_PERTURB, MAX_PERTURB)
)
return check_strat(strat)
def breed_winning_strats(good_strats: np.array,
strats: dict) -> dict:
'''
Taking parameters from good/winning strategies and breed a new strategy.
Parameters
----------
good_strats : np.array
The index values of the best strategies from the evolution
strats : dict
The dictionary of all strategies
'''
new_strat = {}
for param in strats['0'].keys():
rand_strat_idx = str(random.choice(good_strats))
new_strat[param] = strats[rand_strat_idx][param]
return check_strat(new_strat)
def init_ga() -> Tuple[dict, np.array, np.array]:
'''
Initialise the parameters and data needed for the genetic algorithm
Returns
-------
strats : dict
A random set of strategies
fitness : np_arr
An array to store the fitness values in for each strategy
fitness_to_calc : np_arr
An array to indicate which strategies to calculate the fitness for
'''
# Initialise by finding NUM_STRATS strategies which are perturbations from
# the starting strategy defined in the global variables
strats = {
f'{n}': perturb_strat(deepcopy(STARTING_STRAT))
for n in range(0, NUM_STRATS)
}
# Initialise an empty array to store the fitness values in, col 1 is the
# idx value of the strategy, and col 2 stores the fitness value
fitness = np.zeros((NUM_STRATS, 2))
fitness[:, 0] = np.arange(0, NUM_STRATS)
# Initialise the array to determine which strategies to calculate the
# fitness for. Initially its all of them, but in the optimisation we only
# need to calculate for some of them
fitness_to_calc = np.arange(0, NUM_STRATS)
return strats, fitness, fitness_to_calc
def get_fitness(price_data: list,
strats: dict,
fitness: np.array,
fitness_to_calc: np.array,
lower_filter: str,
upper_filter: str) -> np.array:
'''
Loop over and obtain the fitness for each of the strategies which require
a new fitness calculation.
'''
for idx in fitness_to_calc:
fitness[idx, 1] = strat_fitness(
price_data,
strats[str(idx)],
lower_filter,
upper_filter,
)
return fitness
def strat_fitness(price_data: list,
strat: dict,
lower_filter: str,
upper_filter: str,
testing: bool = False) -> float:
'''
Calculate the fitness value for a one strategy over all of the price data.
'''
fitness = []
for df in price_data:
# Firstly process the price data to include the ma cols (as per the
# strategy).
df_strat = get_ma_cols(deepcopy(df), strat)
# Filter to the training/testing range
df_strat = df_strat[
(df_strat['Date'] >= lower_filter)
& (df_strat['Date'] <= upper_filter)
]
# Run the strategy for this ticker's price data, and return a list of
# percentage gains/losses for each trade.
trade_res = run_strat(
df_strat['Open'].values.astype(np.float64),
df_strat['fast'].values.astype(np.float64),
df_strat['slow'].values.astype(np.float64),
)
if STRAT_EVAL == 'mean':
fitness_val = np.mean(trade_res)
elif STRAT_EVAL == 'median':
fitness_val = np.median(trade_res)
elif STRAT_EVAL == 'compounded':
fitness_val = get_compounded(trade_res)
else:
raise ValueError(
'The strategy average ' + STRAT_EVAL +
' has not been implemented.'
)
# This implements the minimum trade per ticker constraint, if we have
# less than the minimum trades, the fitness value is set to be an
# extreme low value to strongly encourage against using this strategy
# NOTE: This is only implemented for training, not for testing
if trade_res.shape[0] > MIN_TRADES or testing:
fitness.append(fitness_val)
else:
fitness.append(-100)
if FITNESS_TYPE == 'min':
return np.min(fitness)
elif FITNESS_TYPE == 'mean':
return np.mean(fitness)
elif FITNESS_TYPE == 'median':
return np.median(fitness)
else:
raise ValueError(
'The fitness type ' + FITNESS_TYPE +
' has not been implemented.'
)
@nb.jit(nopython = True)
def get_compounded(trade_res: np.array):
'''
Get the strategy return as multiples of your initial investment.
'''
invest = 1
for perc in trade_res:
invest = (1+perc)*invest
return invest
def get_ma_cols(df: pd.DataFrame, strat: dict) -> pd.DataFrame:
'''
Add the moving average columns to the dataset, as per the strategy config.
'''
for ma_type in ['slow', 'fast']:
if strat[ma_type + '_ma_type'] == 'simple':
df[ma_type] = (
df[strat[ma_type + '_ma_field']]
.rolling(strat[ma_type + '_ma_length'])
.mean()
)
elif strat[ma_type + '_ma_type'] == 'exponential':
df[ma_type] = (
df[strat[ma_type + '_ma_field']]
.ewm(span = strat[ma_type + '_ma_length'], adjust = False)
.mean()
)
else:
raise ValueError(
'There is no current implementation for the ' +
strat[ma_type + '_ma_type'] + ' moving average type.'
)
return df
@nb.jit(nopython = True)
def run_strat(open_prices: np.array,
fast_ma: np.array,
slow_ma: np.array) -> np.array:
'''
Run the ma crossover strategy. Here, we buy the day after the fast ma
crosses from below the slow ma, and sell when the opposite occurs.
Parameters
----------
open_prices : np.array
The financial instrument open prices on each day
fast_ma : np.array
The faster moving average
slow_ma : np.array
The slower moving average
Returns
-------
trade_res : np.array
The percentage gained/lost on each trade
'''
# Flag to determine whether the instrument is currently held or not
holding = False
# Empty lists to store the results from the strategy
trade_res = []
# The logical criteria for if a ma crossover happens, both on the buy and
# sell side
ma_buy = lambda day: (
fast_ma[day-2] < slow_ma[day-2] and
fast_ma[day-1] > slow_ma[day-1]
)
ma_sell = lambda day: (
fast_ma[day-2] > slow_ma[day-2] and
fast_ma[day-1] < slow_ma[day-1]
)
for day in range(2, open_prices.shape[0]):
if not holding and ma_buy(day):
bought_at = open_prices[day]
holding = True
elif holding and ma_sell(day):
trade_res.append(open_prices[day]/bought_at - 1)
holding = False
# Close out our position at the end of the trading period
if holding:
trade_res.append(open_prices[day]/bought_at - 1)
return np.array(trade_res)
def get_price_data(tickers: list) -> list:
'''
Load in all the price data, and store as a list of pandas dataframes
Parameters
----------
tickers : list
The tickers to load the price data for
'''
return [
pd.read_csv(f'{DATA_DIR}{ticker}.csv')
for ticker in tickers
]
def add_weeks(date: str, weeks: int) -> str:
'''
Add a set number of weeks to a string date of the format yyyy-mm-dd
'''
date = dt.datetime.strptime(date, '%Y-%m-%d')
date = date + dt.timedelta(weeks = weeks)
return dt.datetime.strftime(date, '%Y-%m-%d')
def optimise(price_data: list,
lower_filter: str,
upper_filter: str) -> dict:
'''
The genetic algorithm optimiser
Parameters
----------
price_data : list
The price data for this iteration of optimisation
lower_filter : str
The lower date filter for training
upper_filter : str
The upper date filter for training
Returns
-------
dict
The optimised strategy parameters
'''
# Initialise all the parameters needed to start the evolution
strats, fitness, fitness_to_calc = init_ga()
# This defines the number of strategies to change on each evolution
num_to_change = int((1-KEEP_PERC)*NUM_STRATS)
for evl in tqdm(range(0, NUM_EVOLVE), disable = DISABLE_PROGRESS_BAR):
fitness = get_fitness(
price_data,
strats,
fitness,
fitness_to_calc,
lower_filter,
upper_filter,
)
# Rank the strategies, and select the strategies to change
ranks = fitness[fitness[:, 1].argsort()]
good_strats = ranks[num_to_change:, 0].astype(np.int32)
bad_strats = ranks[:num_to_change, 0].astype(np.int32)
# Split the bad strategies into 3 approx equal sets to make changes
splits = np.array_split(bad_strats, 3)
# Replace some bad strategies with random new ones
for strat in splits[0]:
strats[str(strat)] = get_random_strat()
# Add random perturbations to some good strategies
for strat in splits[1]:
rand_strat = str(random.choice(good_strats))
strats[str(strat)] = perturb_strat(deepcopy(strats[rand_strat]))
# Combine good strategies to make new ones
for strat in splits[2]:
strats[str(strat)] = breed_winning_strats(
good_strats,
deepcopy(strats),
)
# This shows the optimiser which strats have been changed to calculate
# the fitness function on the next iteration. This saves us having to
# recalculate the fitness function for the good strategies and save
# computational time
fitness_to_calc = bad_strats
if PRINT_EVL_PROG:
# Print out evolution statistics for the best five strategies, this
# is helpful to see if the optimiser is doing the correct job (i.e.
# is the fitness being maximised?)
print(f'\nEvolution {evl}')
for count, strat in enumerate(np.flipud(good_strats[-5:])):
print(
str(count) + '. Strategy: ' + str(strat) +
', ' + FITNESS_TYPE + ': ' +
str(fitness[strat, 1])
)
print('----------------------------------------------')
# Return the most optimal strategy after all evolutions
return strats[str(good_strats[-1])]
def main(training_data: list, testing_data: list) -> pd.DataFrame:
'''
From the START_TRADING, this function performs the genetic algorithm
optimisation over TO_TRAIN weeks, and then tests on the next TO_TEST
weeks; after this is finished, this cycle repeats but pushed forwards
TO_TEST weeks so that we optimise on the most recent data and then perform
another feed-forward test. This cycle is repeated until the data stream
ends.
Parameters
----------
training_data : list
A list of pandas dataframes with the training data
testing_data : list
A list of pandas dataframes with the testing data
Returns
-------
pd.DataFrame
The pandas dataframe with the optimisation testing results
'''
lower_date_train = add_weeks(START_TRADING, -TRAIN_OVER)
upper_date_test = START_TRADING
today = dt.datetime.today()
count = 0
stats = []
while dt.datetime.strptime(upper_date_test, '%Y-%m-%d') < today:
print(f'Optimisation {count}')
# Find the date range filters
upper_date_train = add_weeks(lower_date_train, TRAIN_OVER)
upper_date_test = add_weeks(upper_date_train, TEST_OVER)
# Perform the optimisation
strat = optimise(
training_data,
lower_date_train,
upper_date_train,
)
baseline = strat_fitness(
testing_data,
STARTING_STRAT,
upper_date_train,
upper_date_test,
True,
)
optimised = strat_fitness(
testing_data,
strat,
upper_date_train,
upper_date_test,
True,
)
print(f'Lowest training date: {lower_date_train}')
print(f'Upper training date: {upper_date_train}')
print(f'Upper testing date: {upper_date_test}')
print('Testing values before optimisation:', baseline)
print('Testing values after optimisation:', optimised)
print('\n')
stats.append([
count,
lower_date_train,
upper_date_train,
upper_date_test,
baseline,
optimised,
])
count += 1
lower_date_train = add_weeks(lower_date_train, TEST_OVER)
return pd.DataFrame(
data = stats,
columns = [
'optimisation', 'lower_date_train', 'upper_date_train',
'upper_date_test', 'baseline', 'optimised',
]
)
if __name__ == '__main__':
t0 = time.time()
training_data = get_price_data(TRAINING_TICKERS)
testing_data = get_price_data(TESTING_TICKERS)
res = main(training_data, testing_data)
res.to_csv(
f'{START_TRADING} - train_{TRAIN_OVER}_test_{TEST_OVER}_2.csv',
index = False,
)
print('\nFull testing time :', str(time.time()-t0))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment