Skip to content

Instantly share code, notes, and snippets.

@Thomas-X
Created May 7, 2021 15:12
Show Gist options
  • Save Thomas-X/061b9d33724265a74f413a8740ef4014 to your computer and use it in GitHub Desktop.
Save Thomas-X/061b9d33724265a74f413a8740ef4014 to your computer and use it in GitHub Desktop.
a horrible, basic model that uses tensorflow
import os
import datetime
import IPython
import IPython.display
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from datetime import datetime
import seaborn as sns
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from ta.utils import dropna
from ta import add_all_ta_features
mpl.rcParams['figure.figsize'] = (8, 6)
mpl.rcParams['axes.grid'] = False
MAX_EPOCHS = 10
def download_dataset():
dataframe = pd.read_json("data_30m.json")
dataframe.columns = ['time', 'open', 'high', 'low', 'close', 'volume']
dataframe = dropna(dataframe)
dataframe = add_all_ta_features(dataframe, open="open", high="high", low="low", close="close", volume="volume")
time = dataframe.pop('time')
# give a chance for "most" features to warm up, since they might have NaN values at the start of the data
dataframe = dataframe[300:]
for i, name in enumerate(dataframe.columns):
dataframe.drop(name, axis=1)
# manual removal of features/data that don't are either NaN or unused in training
dataframe.pop('open')
dataframe.pop('high')
dataframe.pop('low')
dataframe.pop('volume')
dataframe.pop('trend_psar_up')
dataframe.pop('trend_psar_down')
# print(dataframe.isna().any())
# print(dataframe.columns[dataframe.isna().any()].tolist())
return dataframe, time
def show_features_in_plot():
plot_cols = ['close']
plot_features = df[plot_cols]
plot_features.index = date_time
_ = plot_features.plot(subplots=True)
# IMPORTANT WINDOWS (NON NOTEBOOK)
plt.show()
plot_features = df[plot_cols][:480]
plot_features.index = date_time[:480]
_ = plot_features.plot(subplots=True)
# IMPORTANT FOR WINDOWS (NON NOTEBOOK)
plt.show()
# show_features_in_plot()
df, date_time = download_dataset()
timestamp_s = date_time.map(lambda x: datetime.utcfromtimestamp(x / 1000).strftime('%Y-%m-%d %H:%M:%S'))
# 70% training, 20% validation, 10% test dataset splitting
column_indices = {name: i for i, name in enumerate(df.columns)}
n = len(df)
# 0-0.7
train_df = df[0:int(n * 0.7)]
# 0.7-0.9
val_df = df[int(n * 0.7):int(n * 0.9)]
# 0.9-1.0
test_df = df[int(n * 0.9):]
num_features = df.shape[1]
# scale features (each column individually) from 0-1
train_scaler = MinMaxScaler()
val_scaler = MinMaxScaler()
test_scaler = MinMaxScaler()
train_df[train_df.columns] = train_scaler.fit_transform(train_df[train_df.columns])
val_df[val_df.columns] = val_scaler.fit_transform(val_df[val_df.columns])
test_df[test_df.columns] = test_scaler.fit_transform(test_df[test_df.columns])
# TODO figure out inversing of these transforms
# test_df[test_df.columns] = test_scaler.inverse_transform(test_df[test_df.columns])
# data windowing
# https://www.tensorflow.org/tutorials/structured_data/time_series#data_windowing
class WindowGenerator:
# input parameters are very shady here, shadowing of variables is not intuitive
def __init__(self, input_width, label_width, shift, train_df=train_df, val_df=val_df, test_df=test_df,
label_columns=None):
# Set to this
self.train_df = train_df
self.val_df = val_df
self.test_df = test_df
# Work out the label column indices.
self.label_columns = label_columns
if label_columns is not None:
self.label_columns_indices = {name: i for i, name in
enumerate(label_columns)}
self.column_indices = {name: i for i, name in
enumerate(train_df.columns)}
# [1,2,3,4,5,6,7,8,9]
# Work out the window parameters.
# input_width = 3
self.input_width = input_width
# label_width = 1
self.label_width = label_width
# shift is the 'jump' it makes
# shift = 3
self.shift = shift
# means we end up with:
# [1,2,3(FEATURE),4(LABEL)]
self.total_window_size = input_width + shift
self.input_slice = slice(0, input_width)
self.input_indices = np.arange(self.total_window_size)[self.input_slice]
self.label_start = self.total_window_size - self.label_width
self.labels_slice = slice(self.label_start, None)
self.label_indices = np.arange(self.total_window_size)[self.labels_slice]
def split_window(self, features):
inputs = features[:, self.input_slice, :]
labels = features[:, self.labels_slice, :]
if self.label_columns is not None:
labels = tf.stack(
[labels[:, :, self.column_indices[name]] for name in self.label_columns],
axis=-1)
# Slicing doesn't preserve static shape information, so set the shapes
# manually. This way the `tf.data.Datasets` are easier to inspect.
inputs.set_shape([None, self.input_width, None])
labels.set_shape([None, self.label_width, None])
return inputs, labels
def plot(self, model=None, plot_col='close', max_subplots=3):
inputs, labels = self.example
plt.figure(figsize=(12, 8))
plot_col_index = self.column_indices[plot_col]
max_n = min(max_subplots, len(inputs))
for n in range(max_n):
plt.subplot(3, 1, n + 1)
plt.ylabel(f'{plot_col} [normed]')
plt.plot(self.input_indices, inputs[n, :, plot_col_index],
label='Inputs', marker='.', zorder=-10)
if self.label_columns:
label_col_index = self.label_columns_indices.get(plot_col, None)
else:
label_col_index = plot_col_index
if label_col_index is None:
continue
plt.scatter(self.label_indices, labels[n, :, label_col_index],
edgecolors='k', label='Labels', c='#2ca02c', s=64)
if model is not None:
predictions = model(inputs)
plt.scatter(self.label_indices, predictions[n, :, label_col_index],
marker='X', edgecolors='k', label='Predictions',
c='#ff7f0e', s=64)
if n == 0:
plt.legend()
plt.xlabel('Time [1h candlesticks]')
# Finally this make_dataset method will take a time series DataFrame and convert it to a tf.data.Dataset
# of (input_window, label_window) pairs using the preprocessing.timeseries_dataset_from_array function.
def make_dataset(self, data):
data = np.array(data, dtype=np.float32)
ds = tf.keras.preprocessing.timeseries_dataset_from_array(
data=data,
targets=None,
sequence_length=self.total_window_size,
sequence_stride=1,
shuffle=True,
batch_size=32, )
# ds type = tf.data.Dataset
ds = ds.map(self.split_window)
return ds
def __repr__(self):
return '\n'.join([
f'Total window size: {self.total_window_size}',
f'Input indices: {self.input_indices}',
f'Label indices: {self.label_indices}',
f'Label column name(s): {self.label_columns}'])
@property
def train(self):
return self.make_dataset(self.train_df)
@property
def val(self):
return self.make_dataset(self.val_df)
@property
def test(self):
return self.make_dataset(self.test_df)
@property
def example(self):
"""Get and cache an example batch of `inputs, labels` for plotting."""
result = getattr(self, '_example', None)
if result is None:
# No example batch was found, so get one from the `.train` dataset
result = next(iter(self.train))
# And cache it for next time
self._example = result
return result
WindowGenerator.train = train
WindowGenerator.val = val
WindowGenerator.test = test
WindowGenerator.example = example
# baseline model for comparison to other models
# literally does nothing but just pass on the inputs as "no change", input = 2, output = 2 (no change)
class Baseline(tf.keras.Model):
def __init__(self, label_index=None):
super().__init__()
self.label_index = label_index
def call(self, inputs):
if self.label_index is None:
return inputs
result = inputs[:, :, self.label_index]
return result[:, :, tf.newaxis]
def compile_and_fit(model, window, patience=2):
# disable this for now
# TODO enable early stop again
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
patience=patience,
mode='min')
model.compile(loss=tf.losses.MeanSquaredError(),
optimizer=tf.optimizers.Adam(learning_rate=0.005),
metrics=[tf.metrics.MeanSquaredError()])
history = model.fit(window.train, epochs=MAX_EPOCHS,
validation_data=window.val,
callbacks=[])
return history
# wide window for baseline performance
wide_window = WindowGenerator(
input_width=24, label_width=24, shift=1,
label_columns=['close'])
# single step window
single_step_window = WindowGenerator(
input_width=1, label_width=1, shift=1,
label_columns=['close'])
# convolutional window
CONV_WIDTH = 3
conv_window = WindowGenerator(
input_width=CONV_WIDTH, label_width=1,
shift=1, label_columns=['close']
)
# multi-step window
OUT_STEPS = 4
multi_window = WindowGenerator(input_width=128,
label_width=OUT_STEPS,
shift=OUT_STEPS)
val_performance = {}
performance = {}
multi_val_performance = {}
multi_performance = {}
def create_baseline_model():
baseline = Baseline(label_index=column_indices['close'])
baseline.compile(loss=tf.losses.MeanSquaredError(),
metrics=[tf.metrics.MeanAbsoluteError()])
val_performance['Baseline'] = baseline.evaluate(single_step_window.val)
performance['Baseline'] = baseline.evaluate(single_step_window.test, verbose=0)
return baseline
def create_linear_model():
linear = tf.keras.Sequential([
tf.keras.layers.Dense(units=1)
])
history = compile_and_fit(linear, single_step_window)
val_performance['Linear'] = linear.evaluate(single_step_window.val)
performance['Linear'] = linear.evaluate(single_step_window.test, verbose=0)
return linear, history
def create_dense_model():
dense = tf.keras.Sequential([
tf.keras.layers.Dense(units=64, activation='relu'),
tf.keras.layers.Dense(units=64, activation='relu'),
tf.keras.layers.Dense(units=1)
])
history = compile_and_fit(dense, single_step_window)
val_performance['Dense'] = dense.evaluate(single_step_window.val)
performance['Dense'] = dense.evaluate(single_step_window.test, verbose=0)
return dense, history
def create_lstm_model():
lstm_model = tf.keras.models.Sequential([
# Shape [batch, time, features] => [batch, time, lstm_units]
tf.keras.layers.LSTM(32, return_sequences=True),
# Shape => [batch, time, features]
tf.keras.layers.Dense(units=1)
])
history = compile_and_fit(lstm_model, wide_window)
val_performance['LSTM'] = lstm_model.evaluate(wide_window.val)
performance['LSTM'] = lstm_model.evaluate(wide_window.test, verbose=0)
return lstm_model, history
def create_multi_step_baseline():
class MultiStepLastBaseline(tf.keras.Model):
def call(self, inputs):
return tf.tile(inputs[:, -1:, :], [1, OUT_STEPS, 1])
last_baseline = MultiStepLastBaseline()
last_baseline.compile(loss=tf.losses.MeanSquaredError(),
metrics=[tf.metrics.MeanAbsoluteError()])
multi_val_performance['Last'] = last_baseline.evaluate(multi_window.val)
multi_performance['Last'] = last_baseline.evaluate(multi_window.test, verbose=0)
def create_lstm_multistep_model():
multi_lstm_model = tf.keras.Sequential([
# Shape [batch, time, features] => [batch, lstm_units]
# Adding more `lstm_units` just overfits more quickly.
tf.keras.layers.LSTM(32, return_sequences=False),
# Shape => [batch, out_steps*features]
tf.keras.layers.Dense(OUT_STEPS * num_features,
kernel_initializer=tf.initializers.zeros),
# Shape => [batch, out_steps, features]
tf.keras.layers.Reshape([OUT_STEPS, num_features])
])
history = compile_and_fit(multi_lstm_model, multi_window)
multi_val_performance['LSTM'] = multi_lstm_model.evaluate(multi_window.val)
multi_performance['LSTM'] = multi_lstm_model.evaluate(multi_window.test, verbose=0)
return multi_lstm_model, history
def plot_performance(lstm_model):
x = np.arange(len(performance))
width = 0.3
metric_name = 'mean_squared_error'
metric_index = lstm_model.metrics_names.index('mean_squared_error')
val_mae = [v[metric_index] for v in val_performance.values()]
test_mae = [v[metric_index] for v in performance.values()]
plt.ylabel('mean_squared_error [close, normalized]')
plt.bar(x - 0.17, val_mae, width, label='Validation')
plt.bar(x + 0.17, test_mae, width, label='Test')
plt.xticks(ticks=x, labels=performance.keys(),
rotation=45)
_ = plt.legend()
plt.show()
def plot_multistep_performance(lstm_model):
x = np.arange(len(multi_performance))
width = 0.3
metric_name = 'mean_squared_error'
metric_index = lstm_model.metrics_names.index('mean_squared_error')
val_mae = [v[metric_index] for v in multi_val_performance.values()]
test_mae = [v[metric_index] for v in multi_performance.values()]
plt.bar(x - 0.17, val_mae, width, label='Validation')
plt.bar(x + 0.17, test_mae, width, label='Test')
plt.xticks(ticks=x, labels=multi_performance.keys(),
rotation=45)
plt.ylabel(f'MAE (average over all times and outputs)')
_ = plt.legend()
plt.show()
# baseline = create_baseline_model()
# linear, linear_history = create_linear_model()
# dense, dense_history = create_dense_model()
# lstm, lstm_history = create_lstm_model()
create_multi_step_baseline()
lstm_multistep, lstm_multistep_history = create_lstm_multistep_model()
# # plot performances
plot_multistep_performance(lstm_multistep)
# single_step_window.plot(linear)
# plt.show()
# wide_window.plot(lstm)
# print('single step prediction performance')
# print(performance)
# print(val_performance)
# plt.show()
print('multi step prediction performance')
multi_window.plot(lstm_multistep)
print(multi_performance)
print(multi_val_performance)
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment