Last active May 20, 2024 18:38
import time
import random
import numba as nb
import numpy as np
import pandas as pd
import datetime as dt
from tqdm import tqdm
from copy import deepcopy
# For type hinting
from typing import Tuple
# Global config variables
START_TRADING = '2017-01-01' # Date to start trading on
NUM_STRATS = 35 # Number of strategies to try on each evolution
NUM_EVOLVE = 100 # Number of evolutions to perform
KEEP_PERC = 0.3 # Percentage of top models to keep on each evolution
PRINT_EVL_PROG = False # Whether to print out the results for each evolution
DISABLE_PROGRESS_BAR = False # Whether to disable the GA progress bar
# This is how the fitness is evaluated over a single ticker, e.g. if the
# strategy produces 20 trades, then we may take the mean percentage gained
# over each trade.
# Implemented types: 'mean', 'median', 'compounded'
# Note: 'compounded' means how many multiples of your money would make by
# using the strategy on a single ticker.
STRAT_EVAL = 'compounded'
# This is the metric to take from the fitness values of all tickers, e.g. if
# choosing 'min', you select the min fitness over all tickers; this then
# becomes the strategy's fitness.
# Implemented types: 'min', 'mean', 'median'
# A minimum of x trades are taken per ticker to prevent overfitting, if you
# select 1, then the strategy will find the perfect combination for that
# ticker (but clearly this is not generalisable to other tickers)
# These are the number of weeks to use for the training and testing data, i.e.
# TRAIN_OVER = 12 and TEST_OVER = 6 means we take 12 weeks to optimise the
# hyperparameters over, and then test them on the next 6 weeks
# Strategy parameters to choose from
MA_TYPES = ['simple', 'exponential'] # Types of moving averages to consider
MA_FIELDS = ['Open', 'Low', 'High', 'Close'] # Price fields to choose from
LOWER_MA_LENGTH = 3 # The least length a moving average can have
UPPER_MA_LENGTH = 300 # The maximum length a moving average can
MAX_PERTURB = 10 # The maximum number to perturb the strategy parameters with
# The strategy to intially perturb, and also to use as a benchmark during
# the testing phase
'fast_ma_type': 'simple',
'slow_ma_type': 'simple',
'fast_ma_field': 'Close',
'slow_ma_field': 'Close',
'fast_ma_length': 10,
'slow_ma_length': 20,
def get_random_strat() -> dict:
Generate a fresh random strategy by randomly selecting from the parameters
definied in the global variables.
return check_strat({
'fast_ma_type': random.choice(MA_TYPES),
'slow_ma_type': random.choice(MA_TYPES),
'fast_ma_field': random.choice(MA_FIELDS),
'slow_ma_field': random.choice(MA_FIELDS),
'fast_ma_length': random.randint(LOWER_MA_LENGTH, UPPER_MA_LENGTH),
'slow_ma_length': random.randint(LOWER_MA_LENGTH, UPPER_MA_LENGTH),
def check_strat(strat: dict) -> dict:
This checks if the strategy has valid parameters, and adjusts if not. For
example, if the slower moving average has a smaller length than the faster
one, then this is changed to having a larger value.
for ma_type in ['slow', 'fast']:
if strat[ma_type + '_ma_length'] < LOWER_MA_LENGTH:
strat[ma_type + '_ma_length'] = LOWER_MA_LENGTH
elif strat[ma_type + '_ma_length'] > UPPER_MA_LENGTH:
strat[ma_type + '_ma_length'] = UPPER_MA_LENGTH
if strat['slow_ma_length'] <= strat['fast_ma_length']:
strat['slow_ma_length'] = strat['fast_ma_length'] + 1
return strat
def perturb_strat(strat: dict) -> dict:
Perturb the parameters of the strategy slightly to generate a new strategy
for ma_type in ['slow', 'fast']:
strat[ma_type + '_ma_type'] = random.choice(MA_TYPES)
strat[ma_type + '_ma_field'] = random.choice(MA_FIELDS)
strat[ma_type + '_ma_length'] += (
np.random.randint(-MAX_PERTURB, MAX_PERTURB)
return check_strat(strat)
def breed_winning_strats(good_strats: np.array,
strats: dict) -> dict:
Taking parameters from good/winning strategies and breed a new strategy.
good_strats : np.array
The index values of the best strategies from the evolution
strats : dict
The dictionary of all strategies
new_strat = {}
for param in strats['0'].keys():
rand_strat_idx = str(random.choice(good_strats))
new_strat[param] = strats[rand_strat_idx][param]
return check_strat(new_strat)
def init_ga() -> Tuple[dict, np.array, np.array]:
Initialise the parameters and data needed for the genetic algorithm
strats : dict
A random set of strategies
fitness : np_arr
An array to store the fitness values in for each strategy
fitness_to_calc : np_arr
An array to indicate which strategies to calculate the fitness for
# Initialise by finding NUM_STRATS strategies which are perturbations from
# the starting strategy defined in the global variables
strats = {
f'{n}': perturb_strat(deepcopy(STARTING_STRAT))
for n in range(0, NUM_STRATS)
# Initialise an empty array to store the fitness values in, col 1 is the
# idx value of the strategy, and col 2 stores the fitness value
fitness = np.zeros((NUM_STRATS, 2))
fitness[:, 0] = np.arange(0, NUM_STRATS)
# Initialise the array to determine which strategies to calculate the
# fitness for. Initially its all of them, but in the optimisation we only
# need to calculate for some of them
fitness_to_calc = np.arange(0, NUM_STRATS)
return strats, fitness, fitness_to_calc
def get_fitness(price_data: list,
strats: dict,
fitness: np.array,
fitness_to_calc: np.array,
lower_filter: str,
upper_filter: str) -> np.array:
Loop over and obtain the fitness for each of the strategies which require
a new fitness calculation.
for idx in fitness_to_calc:
fitness[idx, 1] = strat_fitness(
return fitness
def strat_fitness(price_data: list,
strat: dict,
lower_filter: str,
upper_filter: str,
testing: bool = False) -> float:
Calculate the fitness value for a one strategy over all of the price data.
fitness = []
for df in price_data:
# Firstly process the price data to include the ma cols (as per the
# strategy).
df_strat = get_ma_cols(deepcopy(df), strat)
# Filter to the training/testing range
df_strat = df_strat[
(df_strat['Date'] >= lower_filter)
& (df_strat['Date'] <= upper_filter)
# Run the strategy for this ticker's price data, and return a list of
# percentage gains/losses for each trade.
trade_res = run_strat(
if STRAT_EVAL == 'mean':
fitness_val = np.mean(trade_res)
elif STRAT_EVAL == 'median':
fitness_val = np.median(trade_res)
elif STRAT_EVAL == 'compounded':
fitness_val = get_compounded(trade_res)
raise ValueError(
'The strategy average ' + STRAT_EVAL +
' has not been implemented.'
# This implements the minimum trade per ticker constraint, if we have
# less than the minimum trades, the fitness value is set to be an
# extreme low value to strongly encourage against using this strategy
# NOTE: This is only implemented for training, not for testing
if trade_res.shape[0] > MIN_TRADES or testing:
if FITNESS_TYPE == 'min':
return np.min(fitness)
elif FITNESS_TYPE == 'mean':
return np.mean(fitness)
elif FITNESS_TYPE == 'median':
return np.median(fitness)
raise ValueError(
'The fitness type ' + FITNESS_TYPE +
' has not been implemented.'
@nb.jit(nopython = True)
def get_compounded(trade_res: np.array):
Get the strategy return as multiples of your initial investment.
invest = 1
for perc in trade_res:
invest = (1+perc)*invest
return invest
def get_ma_cols(df: pd.DataFrame, strat: dict) -> pd.DataFrame:
Add the moving average columns to the dataset, as per the strategy config.
for ma_type in ['slow', 'fast']:
if strat[ma_type + '_ma_type'] == 'simple':
df[ma_type] = (
df[strat[ma_type + '_ma_field']]
.rolling(strat[ma_type + '_ma_length'])
elif strat[ma_type + '_ma_type'] == 'exponential':
df[ma_type] = (
df[strat[ma_type + '_ma_field']]
.ewm(span = strat[ma_type + '_ma_length'], adjust = False)
raise ValueError(
'There is no current implementation for the ' +
strat[ma_type + '_ma_type'] + ' moving average type.'
return df
@nb.jit(nopython = True)
def run_strat(open_prices: np.array,
fast_ma: np.array,
slow_ma: np.array) -> np.array:
Run the ma crossover strategy. Here, we buy the day after the fast ma
crosses from below the slow ma, and sell when the opposite occurs.
open_prices : np.array
The financial instrument open prices on each day
fast_ma : np.array
The faster moving average
slow_ma : np.array
The slower moving average
trade_res : np.array
The percentage gained/lost on each trade
# Flag to determine whether the instrument is currently held or not
holding = False
# Empty lists to store the results from the strategy
trade_res = []
# The logical criteria for if a ma crossover happens, both on the buy and
# sell side
ma_buy = lambda day: (
fast_ma[day-2] < slow_ma[day-2] and
fast_ma[day-1] > slow_ma[day-1]
ma_sell = lambda day: (
fast_ma[day-2] > slow_ma[day-2] and
fast_ma[day-1] < slow_ma[day-1]
for day in range(2, open_prices.shape[0]):
if not holding and ma_buy(day):
bought_at = open_prices[day]
holding = True
elif holding and ma_sell(day):
trade_res.append(open_prices[day]/bought_at - 1)
holding = False
# Close out our position at the end of the trading period
if holding:
trade_res.append(open_prices[day]/bought_at - 1)
return np.array(trade_res)
def get_price_data(tickers: list) -> list:
Load in all the price data, and store as a list of pandas dataframes
tickers : list
The tickers to load the price data for
return [
for ticker in tickers
def add_weeks(date: str, weeks: int) -> str:
Add a set number of weeks to a string date of the format yyyy-mm-dd
date = dt.datetime.strptime(date, '%Y-%m-%d')
date = date + dt.timedelta(weeks = weeks)
return dt.datetime.strftime(date, '%Y-%m-%d')
def optimise(price_data: list,
lower_filter: str,
upper_filter: str) -> dict:
The genetic algorithm optimiser
price_data : list
The price data for this iteration of optimisation
lower_filter : str
The lower date filter for training
upper_filter : str
The upper date filter for training
The optimised strategy parameters
# Initialise all the parameters needed to start the evolution
strats, fitness, fitness_to_calc = init_ga()
# This defines the number of strategies to change on each evolution
num_to_change = int((1-KEEP_PERC)*NUM_STRATS)
for evl in tqdm(range(0, NUM_EVOLVE), disable = DISABLE_PROGRESS_BAR):
fitness = get_fitness(
# Rank the strategies, and select the strategies to change
ranks = fitness[fitness[:, 1].argsort()]
good_strats = ranks[num_to_change:, 0].astype(np.int32)
bad_strats = ranks[:num_to_change, 0].astype(np.int32)
# Split the bad strategies into 3 approx equal sets to make changes
splits = np.array_split(bad_strats, 3)
# Replace some bad strategies with random new ones
for strat in splits[0]:
strats[str(strat)] = get_random_strat()
# Add random perturbations to some good strategies
for strat in splits[1]:
rand_strat = str(random.choice(good_strats))
strats[str(strat)] = perturb_strat(deepcopy(strats[rand_strat]))
# Combine good strategies to make new ones
for strat in splits[2]:
strats[str(strat)] = breed_winning_strats(
# This shows the optimiser which strats have been changed to calculate
# the fitness function on the next iteration. This saves us having to
# recalculate the fitness function for the good strategies and save
# computational time
fitness_to_calc = bad_strats
# Print out evolution statistics for the best five strategies, this
# is helpful to see if the optimiser is doing the correct job (i.e.
# is the fitness being maximised?)
print(f'\nEvolution {evl}')
for count, strat in enumerate(np.flipud(good_strats[-5:])):
str(count) + '. Strategy: ' + str(strat) +
', ' + FITNESS_TYPE + ': ' +
str(fitness[strat, 1])
# Return the most optimal strategy after all evolutions
return strats[str(good_strats[-1])]
def main(training_data: list, testing_data: list) -> pd.DataFrame:
From the START_TRADING, this function performs the genetic algorithm
optimisation over TO_TRAIN weeks, and then tests on the next TO_TEST
weeks; after this is finished, this cycle repeats but pushed forwards
TO_TEST weeks so that we optimise on the most recent data and then perform
another feed-forward test. This cycle is repeated until the data stream
training_data : list
A list of pandas dataframes with the training data
testing_data : list
A list of pandas dataframes with the testing data
The pandas dataframe with the optimisation testing results
lower_date_train = add_weeks(START_TRADING, -TRAIN_OVER)
upper_date_test = START_TRADING
today =
count = 0
stats = []
while dt.datetime.strptime(upper_date_test, '%Y-%m-%d') < today:
print(f'Optimisation {count}')
# Find the date range filters
upper_date_train = add_weeks(lower_date_train, TRAIN_OVER)
upper_date_test = add_weeks(upper_date_train, TEST_OVER)
# Perform the optimisation
strat = optimise(
baseline = strat_fitness(
optimised = strat_fitness(
print(f'Lowest training date: {lower_date_train}')
print(f'Upper training date: {upper_date_train}')
print(f'Upper testing date: {upper_date_test}')
print('Testing values before optimisation:', baseline)
print('Testing values after optimisation:', optimised)
count += 1
lower_date_train = add_weeks(lower_date_train, TEST_OVER)
return pd.DataFrame(
data = stats,
columns = [
'optimisation', 'lower_date_train', 'upper_date_train',
'upper_date_test', 'baseline', 'optimised',
if __name__ == '__main__':
t0 = time.time()
training_data = get_price_data(TRAINING_TICKERS)
testing_data = get_price_data(TESTING_TICKERS)
res = main(training_data, testing_data)
f'{START_TRADING} - train_{TRAIN_OVER}_test_{TEST_OVER}_2.csv',
index = False,
print('\nFull testing time :', str(time.time()-t0))
