Created
October 30, 2018 12:31
-
-
Save ericbhanson/200deb15519bea99a1e117002ca47697 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import da_ponz.utilities.data_tools as data_tools | |
import da_ponz.utilities.logging_tools as logging_tools | |
import da_ponz.lstm.data_preparation as data_preparation | |
import da_ponz.lstm.model as lstm | |
import da_ponz.utilities.sql_tools as sql_tools | |
import itertools | |
import json | |
import keras | |
import math | |
# All testing is run through a single function, so that it can be called from __main__.py. | |
def run_testing(logger, settings): | |
# Set variables related to data preparation. | |
columns = ['timestamp', 'close', 'high', 'open', 'low', 'volume'] | |
diff_list = [True, False] | |
engine = sql_tools.create_engine(settings['mysql_connection']) | |
granularity = 60 | |
k_folds_list = [True, False] | |
limit = None | |
symbols_list = ['BCH/USD', 'BTC/USD', 'ETC/USD', 'ETH/USD', 'LTC/USD'] | |
test_size = .25 | |
# Set variables related to the creation of the LSTM model. | |
activations_list = [keras.layers.Activation('linear')] | |
epochs_list = [25, 50, 100, 500, 1000, 2000, 4000] | |
layers_list = [[keras.layers.Dense(1)]] | |
optimizers_list = ['adam', 'rmsprop', 'sgd'] | |
scalers_list = [{'feature_range': (0, 1), 'name': 'MinMaxScaler'}] | |
stateful_settings = [{'shuffle': False, 'stateful': True}, {'shuffle': True, 'stateful': False}, | |
{'shuffle': False, 'stateful': False}] | |
test_runs = 31 | |
units_list = [1, 2, 4, 8, 16] | |
# Load the results table to store the results of each test run. | |
results_table = sql_tools.get_table(engine, logger, 'results') | |
# Create a product of all of those variables to run through during testing and loop through them. | |
for var in itertools.product(activations_list, diff_list, epochs_list, k_folds_list, layers_list, optimizers_list, | |
scalers_list, stateful_settings, symbols_list, units_list): | |
# Set the data preparation variables for this loop. | |
diff = var[1] | |
k_fold = var[3] | |
scaler_settings = var[6] | |
scaler_name = scaler_settings['name'] | |
symbol = var[8] | |
# Get the test data and turn it into a dataframe. | |
test_data = data_tools.get_test_data(columns, engine, logger, limit=limit, order='desc', | |
where=[granularity, symbol]) | |
df = data_tools.create_dataframe(columns, test_data, index='timestamp', sort_index=True) | |
# Prepare the data for modeling. | |
data = data_preparation.prepare_data(diff, df, k_fold, limit, scaler_settings, test_size) | |
# Set the model variables for this loop. | |
activation = var[0] | |
activation_type = var[0].get_config()['activation'] | |
epochs = var[2] | |
layers = var[4] | |
layer_names = ', '.join([layer.get_config()['name'] for layer in layers]) | |
optimizer = var[5] | |
shuffle = var[7]['shuffle'] | |
stateful = var[7]['stateful'] | |
units = var[9] | |
# Batch size relies on the length of the X sets, so it's calculated separately. Split the resulting list in | |
# half, because I think the larger batch sizes will process more quickly. | |
batch_sizes = data_tools.find_common_factors(data['X_test'].shape[0], data['X_train'].shape[0]) | |
start = math.floor(len(batch_sizes)/2) | |
# Capture the test variables in a dictionary. | |
test_results = {'activation': activation_type, 'diff': diff, 'epochs': epochs, 'k_fold': k_fold, | |
'layers': layer_names, 'optimizer': optimizer, 'scaler': scaler_name, 'shuffle': shuffle, | |
'stateful': stateful, 'symbol': symbol, 'units': units, | |
'X_test_length': data['X_test'].shape[0], 'X_train_length': data['X_train'].shape[0], | |
'y_test_length': data['y_test'].shape[0], 'y_train_length': data['y_train'].shape[0]} | |
# Determine the number of tests remaining for this group of batches and test runs and store them in a tuple. | |
r_tests = data_tools.get_remaining_tests(batch_sizes[start:], engine, logger, test_runs, test_results) | |
# Loop through the number of items in remaining batch sizes (r_tests[0]) to get the batch size for the current | |
# test. | |
for i in range(len(r_tests[0])): | |
batch_size = r_tests[0][i] | |
# Loop through the number of ranges in remaining test sizes (r_tests[1]) to get the number of test loops. | |
# Running the same test across multiple iterations will capture an average level of performance that will | |
# help in evaluating the model settings. | |
for j in r_tests[1][i]: | |
# Add the batch size and test run number to the test variables dictionary. | |
test_results['batch_size'] = batch_size | |
test_results['test_run'] = j | |
# Generate a logging message with all of the variables used in the current test. | |
logging_tools.log_event('info', logger, test_results, multi_line=True) | |
# Build, compile, fit, and evaluate the model using Keras. | |
model = lstm.make_model(activation, batch_size, data, layers, stateful, units) | |
compiled_model = lstm.compile_model(model, optimizer) | |
fit_model = lstm.fit_model(data, batch_size, epochs, compiled_model, shuffle) | |
evaluate_model = lstm.evaluate_model(data, batch_size, compiled_model) | |
# Add the results to the test variables dictionary. To prevent errors, test loss needs to be turned into a | |
# Python float from a Numpy float64 using item(), while train loss and validation loss need to be turned into | |
# JSON strings and encoded as UTF-8 bytes. | |
test_results['test_loss'] = evaluate_model.item() | |
test_results['train_loss'] = json.dumps(fit_model.history['loss']) | |
test_results['validation_loss'] = json.dumps(fit_model.history['val_loss']) | |
# Insert the test results into the test results table and log the results of the query. | |
insert = sql_tools.insert_data(engine, results_table, test_results) | |
logging_tools.log_event(insert['result'], logger, insert['message']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment