Skip to content

Instantly share code, notes, and snippets.

@ericbhanson
Created October 30, 2018 12:31
Show Gist options
  • Save ericbhanson/200deb15519bea99a1e117002ca47697 to your computer and use it in GitHub Desktop.
Save ericbhanson/200deb15519bea99a1e117002ca47697 to your computer and use it in GitHub Desktop.
import da_ponz.utilities.data_tools as data_tools
import da_ponz.utilities.logging_tools as logging_tools
import da_ponz.lstm.data_preparation as data_preparation
import da_ponz.lstm.model as lstm
import da_ponz.utilities.sql_tools as sql_tools
import itertools
import json
import keras
import math
# All testing is run through a single function, so that it can be called from __main__.py.
def run_testing(logger, settings):
# Set variables related to data preparation.
columns = ['timestamp', 'close', 'high', 'open', 'low', 'volume']
diff_list = [True, False]
engine = sql_tools.create_engine(settings['mysql_connection'])
granularity = 60
k_folds_list = [True, False]
limit = None
symbols_list = ['BCH/USD', 'BTC/USD', 'ETC/USD', 'ETH/USD', 'LTC/USD']
test_size = .25
# Set variables related to the creation of the LSTM model.
activations_list = [keras.layers.Activation('linear')]
epochs_list = [25, 50, 100, 500, 1000, 2000, 4000]
layers_list = [[keras.layers.Dense(1)]]
optimizers_list = ['adam', 'rmsprop', 'sgd']
scalers_list = [{'feature_range': (0, 1), 'name': 'MinMaxScaler'}]
stateful_settings = [{'shuffle': False, 'stateful': True}, {'shuffle': True, 'stateful': False},
{'shuffle': False, 'stateful': False}]
test_runs = 31
units_list = [1, 2, 4, 8, 16]
# Load the results table to store the results of each test run.
results_table = sql_tools.get_table(engine, logger, 'results')
# Create a product of all of those variables to run through during testing and loop through them.
for var in itertools.product(activations_list, diff_list, epochs_list, k_folds_list, layers_list, optimizers_list,
scalers_list, stateful_settings, symbols_list, units_list):
# Set the data preparation variables for this loop.
diff = var[1]
k_fold = var[3]
scaler_settings = var[6]
scaler_name = scaler_settings['name']
symbol = var[8]
# Get the test data and turn it into a dataframe.
test_data = data_tools.get_test_data(columns, engine, logger, limit=limit, order='desc',
where=[granularity, symbol])
df = data_tools.create_dataframe(columns, test_data, index='timestamp', sort_index=True)
# Prepare the data for modeling.
data = data_preparation.prepare_data(diff, df, k_fold, limit, scaler_settings, test_size)
# Set the model variables for this loop.
activation = var[0]
activation_type = var[0].get_config()['activation']
epochs = var[2]
layers = var[4]
layer_names = ', '.join([layer.get_config()['name'] for layer in layers])
optimizer = var[5]
shuffle = var[7]['shuffle']
stateful = var[7]['stateful']
units = var[9]
# Batch size relies on the length of the X sets, so it's calculated separately. Split the resulting list in
# half, because I think the larger batch sizes will process more quickly.
batch_sizes = data_tools.find_common_factors(data['X_test'].shape[0], data['X_train'].shape[0])
start = math.floor(len(batch_sizes)/2)
# Capture the test variables in a dictionary.
test_results = {'activation': activation_type, 'diff': diff, 'epochs': epochs, 'k_fold': k_fold,
'layers': layer_names, 'optimizer': optimizer, 'scaler': scaler_name, 'shuffle': shuffle,
'stateful': stateful, 'symbol': symbol, 'units': units,
'X_test_length': data['X_test'].shape[0], 'X_train_length': data['X_train'].shape[0],
'y_test_length': data['y_test'].shape[0], 'y_train_length': data['y_train'].shape[0]}
# Determine the number of tests remaining for this group of batches and test runs and store them in a tuple.
r_tests = data_tools.get_remaining_tests(batch_sizes[start:], engine, logger, test_runs, test_results)
# Loop through the number of items in remaining batch sizes (r_tests[0]) to get the batch size for the current
# test.
for i in range(len(r_tests[0])):
batch_size = r_tests[0][i]
# Loop through the number of ranges in remaining test sizes (r_tests[1]) to get the number of test loops.
# Running the same test across multiple iterations will capture an average level of performance that will
# help in evaluating the model settings.
for j in r_tests[1][i]:
# Add the batch size and test run number to the test variables dictionary.
test_results['batch_size'] = batch_size
test_results['test_run'] = j
# Generate a logging message with all of the variables used in the current test.
logging_tools.log_event('info', logger, test_results, multi_line=True)
# Build, compile, fit, and evaluate the model using Keras.
model = lstm.make_model(activation, batch_size, data, layers, stateful, units)
compiled_model = lstm.compile_model(model, optimizer)
fit_model = lstm.fit_model(data, batch_size, epochs, compiled_model, shuffle)
evaluate_model = lstm.evaluate_model(data, batch_size, compiled_model)
# Add the results to the test variables dictionary. To prevent errors, test loss needs to be turned into a
# Python float from a Numpy float64 using item(), while train loss and validation loss need to be turned into
# JSON strings and encoded as UTF-8 bytes.
test_results['test_loss'] = evaluate_model.item()
test_results['train_loss'] = json.dumps(fit_model.history['loss'])
test_results['validation_loss'] = json.dumps(fit_model.history['val_loss'])
# Insert the test results into the test results table and log the results of the query.
insert = sql_tools.insert_data(engine, results_table, test_results)
logging_tools.log_event(insert['result'], logger, insert['message'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment