Skip to content

Instantly share code, notes, and snippets.

@spacegoing
Created November 6, 2019 11:31
Show Gist options
  • Save spacegoing/7935e5c2f0c8fa2f0719d2e729e794e8 to your computer and use it in GitHub Desktop.
Save spacegoing/7935e5c2f0c8fa2f0719d2e729e794e8 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from keras.models import Sequential
from keras.layers import Dense, LSTM
import numpy as np
from numpy.random import choice
def prepare_sequences(x_train, window_length):
windows = []
for i, sequence in enumerate(x_train):
for window_start in range(0, T - window_length + 1):
window_end = window_start + window_length
window = sequence[window_start:window_end]
windows.append(window)
return np.array(windows)
def get_sequential_batch(bX_train, bY_train, N_train, batch_size):
bX_train = bX_train.reshape(N_train, T - window_length + 1, window_length)
N = N_train - N_train % batch_size
for i in range(0, N, batch_size):
for t in range(T - window_length + 1):
bX = bX_train[i:i + batch_size, t, :]
bY = bY_train[i:i + batch_size]
yield bX[..., np.newaxis], bY[..., np.newaxis]
## hyper parameters
debug = True
N = 1200
T = 20
N_train = 1000
N_test = N - N_train
window_length = 10
batch_size = 32
epochs = 4
# if stateful = True, test acc = 1.0; False, test acc = 0.5
stateful = False
## create train / test dataset
data = np.zeros([N, T])
one_indexes = choice(a=N, size=N // 2, replace=False)
data[one_indexes, 0] = 1 # very long term memory.
X_train = data[:N_train]
Y_train = X_train[:, 0]
X_test = data[N_train:]
Y_test = X_test[:, 0]
## create model
model = Sequential()
model.add(
LSTM(
3,
batch_input_shape=(batch_size, window_length, 1),
return_sequences=False,
stateful=stateful))
model.add(Dense(1, activation='sigmoid'))
model.compile(
loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
## training loop
for e in range(epochs):
# train data generator
bX_train = prepare_sequences(X_train, window_length)
x_train_batch_gen = get_sequential_batch(bX_train, Y_train, N_train,
batch_size)
tr_acc = []
tr_loss = []
# debug
t_dataset = []
counter = 0
for bX, bY in x_train_batch_gen:
loss, acc = model.train_on_batch(bX, bY)
tr_loss.append(loss)
tr_acc.append(acc)
counter += 1
# debug
if counter == 1 and debug:
t_dataset.append(
sum(bY[:, 0] == bX[:, 0, :].reshape(-1)) + int(bX.sum() == bY.sum()))
# reset states
if counter == T - window_length + 1:
model.reset_states()
counter = 0
print(np.mean(tr_acc))
# debug
if debug:
print(np.mean(t_dataset))
## testing loop
bX_test = prepare_sequences(X_test, window_length)
x_test_batch_gen = get_sequential_batch(bX_test, Y_test, N_test, batch_size)
test_tr_acc = []
test_tr_loss = []
test_dataset = []
counter = 0
for bX, bY in x_test_batch_gen:
loss, acc = model.test_on_batch(bX, bY)
test_tr_loss.append(loss)
test_tr_acc.append(acc)
counter += 1
# debug
if counter == 1 and debug:
test_dataset.append(
sum(bY[:, 0] == bX[:, 0, :].reshape(-1)) + int(bX.sum() == bY.sum()))
if counter == T - window_length + 1:
model.reset_states()
counter = 0
print(np.mean(test_tr_acc))
# debug
if debug:
print(np.mean(test_dataset))
@spacegoing
Copy link
Author

The example used in Impact of sequences subsampling section is incorrect.
In order to stateful parameter take effect, the batch data need to be temporally aligned, rather than use results from prepare_sequences(x_train, window_length) in the original post directly. Please refer to get_sequential_batch in this gist to understand temporal alignment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment