Skip to content

Instantly share code, notes, and snippets.

@r0adrunner
Created September 7, 2019 18:31
Show Gist options
  • Save r0adrunner/db710ca66e2350ad21910ab527b69354 to your computer and use it in GitHub Desktop.
Save r0adrunner/db710ca66e2350ad21910ab527b69354 to your computer and use it in GitHub Desktop.
import numpy as np
import matplotlib.pyplot as plt
from pandas import read_csv
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.utils import to_categorical
from keras.optimizers import SGD, Adagrad, Adam
train_csv = '/home/temp/test-seqs/s2.csv'
test_csv = '/home/temp/test-seqs/s2-test.csv'
timesteps = 21
n_features = 18
n_samples = 1000 # < 20000
positives_mean = 5
negatives_mean = -5
no_order_match_val = 100
x = read_csv(train_csv, header=None, dtype=np.dtype(np.uint8), nrows=n_samples, usecols=range(1, timesteps+1)).to_numpy()
y = read_csv(train_csv, header=None, dtype=np.dtype(np.float32), nrows=n_samples, usecols=[0]).to_numpy()
x_test = read_csv(test_csv, header=None, dtype=np.dtype(np.uint8), usecols=range(1, timesteps+1)).to_numpy()
y_test = read_csv(test_csv, header=None, dtype=np.dtype(np.float32), usecols=[0]).to_numpy()
# one-hot encode; from [samples, timesteps] into [samples, timesteps, features]
def encode_sequence (sequence):
X = list()
for i in range(len(sequence)):
X.append (to_categorical(sequence[i], num_classes=n_features, dtype='uint8'))
return np.array(X, dtype='uint8')
Xe = encode_sequence(x)
model = Sequential()
model.add(LSTM(35, activation='relu', return_sequences=True, input_shape=(timesteps, n_features)))
model.add(LSTM(25, activation='relu', return_sequences=True))
model.add(LSTM(15, activation='relu', return_sequences=True))
model.add(LSTM(5, activation='relu'))
model.add(Dense(1))
# opt = SGD(lr=0.001, momentum=0.9)
# opt = Adagrad()
opt = Adam(lr=0.001)
model.compile(optimizer=opt, loss='mse')
# fit model
model.fit(Xe, y, batch_size=1, epochs=1, verbose=1)
# predict
Xe_test = encode_sequence(x_test)
yhat = model.predict(Xe_test, verbose=0)
# Separate between positive and negatives
def separate_y (y_pred,y_test):
Ypositive = list()
Ynegatives = list()
Ynoorder = list()
for i in range(len(y_pred)):
if y_test[i] == positives_mean:
Ypositive.append (y_pred[i])
elif y_test[i] == negatives_mean:
Ynegatives.append (y_pred[i])
else:
Ynoorder.append (y_pred[i])
return np.array(Ypositive, dtype='float32'), np.array(Ynegatives, dtype='float32'), np.array(Ynoorder, dtype='float32')
# plot prediction
colors = ['green', 'red', 'blue']
plt.hist(separate_y(yhat,y_test), 100, color=colors, alpha=1, stacked=False, fill=False, histtype='step')
fig1 = plt.gcf()
plt.show()
fig1.savefig('hist.png')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment