Skip to content

Instantly share code, notes, and snippets.

@fanwei918
Forked from jkleint/timeseries_cnn.py
Created November 28, 2017 16:30
Show Gist options
  • Save fanwei918/4266991e57d875e64abf5689a1f1849c to your computer and use it in GitHub Desktop.
Save fanwei918/4266991e57d875e64abf5689a1f1849c to your computer and use it in GitHub Desktop.
Example of using Keras to implement a 1D convolutional neural network (CNN) for timeseries prediction.
#!/usr/bin/env python
"""
Example of using Keras to implement a 1D convolutional neural network (CNN) for timeseries prediction.
"""
from __future__ import print_function, division
import numpy as np
from keras.layers import Convolution1D, Dense, MaxPooling1D, Flatten
from keras.models import Sequential
__date__ = '2016-07-22'
def make_timeseries_regressor(window_size, filter_length, nb_input_series=1, nb_outputs=1, nb_filter=4):
""":Return: a Keras Model for predicting the next value in a timeseries given a fixed-size lookback window of previous values.
The model can handle multiple input timeseries (`nb_input_series`) and multiple prediction targets (`nb_outputs`).
:param int window_size: The number of previous timeseries values to use as input features. Also called lag or lookback.
:param int nb_input_series: The number of input timeseries; 1 for a single timeseries.
The `X` input to ``fit()`` should be an array of shape ``(n_instances, window_size, nb_input_series)``; each instance is
a 2D array of shape ``(window_size, nb_input_series)``. For example, for `window_size` = 3 and `nb_input_series` = 1 (a
single timeseries), one instance could be ``[[0], [1], [2]]``. See ``make_timeseries_instances()``.
:param int nb_outputs: The output dimension, often equal to the number of inputs.
For each input instance (array with shape ``(window_size, nb_input_series)``), the output is a vector of size `nb_outputs`,
usually the value(s) predicted to come after the last value in that input instance, i.e., the next value
in the sequence. The `y` input to ``fit()`` should be an array of shape ``(n_instances, nb_outputs)``.
:param int filter_length: the size (along the `window_size` dimension) of the sliding window that gets convolved with
each position along each instance. The difference between 1D and 2D convolution is that a 1D filter's "height" is fixed
to the number of input timeseries (its "width" being `filter_length`), and it can only slide along the window
dimension. This is useful as generally the input timeseries have no spatial/ordinal relationship, so it's not
meaningful to look for patterns that are invariant with respect to subsets of the timeseries.
:param int nb_filter: The number of different filters to learn (roughly, input patterns to recognize).
"""
model = Sequential((
# The first conv layer learns `nb_filter` filters (aka kernels), each of size ``(filter_length, nb_input_series)``.
# Its output will have shape (None, window_size - filter_length + 1, nb_filter), i.e., for each position in
# the input timeseries, the activation of each filter at that position.
Convolution1D(nb_filter=nb_filter, filter_length=filter_length, activation='relu', input_shape=(window_size, nb_input_series)),
MaxPooling1D(), # Downsample the output of convolution by 2X.
Convolution1D(nb_filter=nb_filter, filter_length=filter_length, activation='relu'),
MaxPooling1D(),
Flatten(),
Dense(nb_outputs, activation='linear'), # For binary classification, change the activation to 'sigmoid'
))
model.compile(loss='mse', optimizer='adam', metrics=['mae'])
# To perform (binary) classification instead:
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['binary_accuracy'])
return model
def make_timeseries_instances(timeseries, window_size):
"""Make input features and prediction targets from a `timeseries` for use in machine learning.
:return: A tuple of `(X, y, q)`. `X` are the inputs to a predictor, a 3D ndarray with shape
``(timeseries.shape[0] - window_size, window_size, timeseries.shape[1] or 1)``. For each row of `X`, the
corresponding row of `y` is the next value in the timeseries. The `q` or query is the last instance, what you would use
to predict a hypothetical next (unprovided) value in the `timeseries`.
:param ndarray timeseries: Either a simple vector, or a matrix of shape ``(timestep, series_num)``, i.e., time is axis 0 (the
row) and the series is axis 1 (the column).
:param int window_size: The number of samples to use as input prediction features (also called the lag or lookback).
"""
timeseries = np.asarray(timeseries)
assert 0 < window_size < timeseries.shape[0]
X = np.atleast_3d(np.array([timeseries[start:start + window_size] for start in range(0, timeseries.shape[0] - window_size)]))
y = timeseries[window_size:]
q = np.atleast_3d([timeseries[-window_size:]])
return X, y, q
def evaluate_timeseries(timeseries, window_size):
"""Create a 1D CNN regressor to predict the next value in a `timeseries` using the preceding `window_size` elements
as input features and evaluate its performance.
:param ndarray timeseries: Timeseries data with time increasing down the rows (the leading dimension/axis).
:param int window_size: The number of previous timeseries values to use to predict the next.
"""
filter_length = 5
nb_filter = 4
timeseries = np.atleast_2d(timeseries)
if timeseries.shape[0] == 1:
timeseries = timeseries.T # Convert 1D vectors to 2D column vectors
nb_samples, nb_series = timeseries.shape
print('\n\nTimeseries ({} samples by {} series):\n'.format(nb_samples, nb_series), timeseries)
model = make_timeseries_regressor(window_size=window_size, filter_length=filter_length, nb_input_series=nb_series, nb_outputs=nb_series, nb_filter=nb_filter)
print('\n\nModel with input size {}, output size {}, {} conv filters of length {}'.format(model.input_shape, model.output_shape, nb_filter, filter_length))
model.summary()
X, y, q = make_timeseries_instances(timeseries, window_size)
print('\n\nInput features:', X, '\n\nOutput labels:', y, '\n\nQuery vector:', q, sep='\n')
test_size = int(0.01 * nb_samples) # In real life you'd want to use 0.2 - 0.5
X_train, X_test, y_train, y_test = X[:-test_size], X[-test_size:], y[:-test_size], y[-test_size:]
model.fit(X_train, y_train, nb_epoch=25, batch_size=2, validation_data=(X_test, y_test))
pred = model.predict(X_test)
print('\n\nactual', 'predicted', sep='\t')
for actual, predicted in zip(y_test, pred.squeeze()):
print(actual.squeeze(), predicted, sep='\t')
print('next', model.predict(q).squeeze(), sep='\t')
def main():
"""Prepare input data, build model, evaluate."""
np.set_printoptions(threshold=25)
ts_length = 1000
window_size = 50
print('\nSimple single timeseries vector prediction')
timeseries = np.arange(ts_length) # The timeseries f(t) = t
evaluate_timeseries(timeseries, window_size)
print('\nMultiple-input, multiple-output prediction')
timeseries = np.array([np.arange(ts_length), -np.arange(ts_length)]).T # The timeseries f(t) = [t, -t]
evaluate_timeseries(timeseries, window_size)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment