Skip to content

Instantly share code, notes, and snippets.

@dsjt
Last active July 7, 2019 21:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dsjt/fe7eace690a098516c1514264af2b7da to your computer and use it in GitHub Desktop.
Save dsjt/fe7eace690a098516c1514264af2b7da to your computer and use it in GitHub Desktop.
from glob import iglob
from sample_RNN import my_RNN
import matplotlib.pyplot as plt
import numpy as np
import pickle
from sklearn.model_selection import train_test_split
def load_data(num_of_sample, length_of_sequences):
"""
引数
num_of_sample: サンプルの個数
length_of_sequences: 系列の長さ
返り値
X: (num_of_sample, length_of_sequences)の配列
t: (num_of_sample, length_of_sequences)の配列
"""
X = []
t = []
for i, fn in enumerate(iglob("../work/F/*.pkl")):
if i >= num_of_sample:
break
with open(fn, "rb") as f:
x, f = pickle.load(f)
X.append(x.ravel())
t.append(f.ravel())
return X, t
batch_size = 2500
length_of_sequences = 5
num_of_sample = 3000
num_of_hidden_nodes = 80
num_of_training_epochs = 2000
size_of_mini_batch = 1
data_x, data_t = load_data(num_of_sample, "dummy")
# 各系列を2500でカット。
reshaped_x, reshaped_t = [], []
for x, t in zip(data_x, data_t):
reshaped_x.extend(
np.split(x[:batch_size], batch_size//length_of_sequences))
reshaped_t.extend(
np.split(t[:batch_size], batch_size//length_of_sequences))
X_train, X_test, y_train, y_test = train_test_split(
reshaped_x, reshaped_t, test_size=0.1, random_state=42)
print(len(X_train))
print(len(X_test))
myrnn = my_RNN(num_of_input_nodes=1,
num_of_hidden_nodes=num_of_hidden_nodes,
num_of_output_nodes=1,
length_of_sequences=length_of_sequences,
num_of_training_epochs=num_of_training_epochs,
size_of_mini_batch=size_of_mini_batch,
num_of_prediction_epochs=100,
learning_rate=0.01,
forget_bias=0.8,
num_of_sample=num_of_sample)
myrnn.train(X_train, y_train)
myrnn.test(X_test, y_test)
import pickle
import matplotlib.pyplot as plt
with open("./tmp.pkl", "rb") as f:
inputs, supervisors, sample_output, sample_loss = pickle.load(f)
print(inputs.shape)
print(supervisors.shape)
print(sample_output.shape)
print(sample_loss)
fig, axes = plt.subplots(3, figsize=(8, 8))
axes[0].plot(inputs[1].ravel())
axes[0].set_title("教師入力")
axes[1].plot(supervisors[1].ravel())
axes[1].set_title("教師出力")
axes[2].plot(supervisors[1].ravel(), label="答え")
axes[2].plot(sample_output[1].ravel(), label="サンプル")
axes[2].set_title("教師出力とNN出力の比較")
axes[2].legend()
print(sample_loss)
fig.show()
# https://github.com/yukiB/rnntest
import tensorflow as tf
import numpy as np
import random
import pickle
num_of_input_nodes = 1
num_of_hidden_nodes = 80
num_of_output_nodes = 1
length_of_sequences = 10
num_of_training_epochs = 5000
size_of_mini_batch = 100
num_of_prediction_epochs = 100
learning_rate = 0.01
forget_bias = 0.8
num_of_sample = 1000
def get_batch(batch_size, X, t):
"""
"""
rnum = [random.randint(0, len(X) - 1) for x in range(batch_size)]
xs = np.array([[[y] for y in X[r]] for r in rnum])
ts = np.array([[[y] for y in t[r]] for r in rnum])
return xs, ts
def create_data(nb_of_samples, sequence_len):
X = np.zeros((nb_of_samples, sequence_len))
for row_idx in range(nb_of_samples):
X[row_idx, :] = np.around(np.random.rand(sequence_len)).astype(int)
# Create the targets for each sequence
t = np.sum(X, axis=1)
return X, t
def make_prediction(nb_of_samples):
sequence_len = 10
xs, ts = create_data(nb_of_samples, sequence_len)
return np.array([[[y] for y in x] for x in xs]), np.array([[x] for x in ts])
class my_RNN(object):
def __init__(self,
num_of_input_nodes=1,
num_of_hidden_nodes=80,
num_of_output_nodes=1,
length_of_sequences=10,
num_of_training_epochs=5000,
size_of_mini_batch=100,
num_of_prediction_epochs=100,
learning_rate=0.01,
forget_bias=0.8,
num_of_sample=1000):
self.num_of_input_nodes = num_of_input_nodes
self.num_of_hidden_nodes = num_of_hidden_nodes
self.num_of_output_nodes = num_of_output_nodes
self.length_of_sequences = length_of_sequences
self.num_of_training_epochs = num_of_training_epochs
self.size_of_mini_batch = size_of_mini_batch
self.num_of_prediction_epochs = num_of_prediction_epochs
self.learning_rate = learning_rate
self.forget_bias = forget_bias
self.num_of_sample = num_of_sample
# 乱数の初期化
random.seed(123)
np.random.seed(123)
tf.set_random_seed(123)
self.g = tf.Graph()
with self.g.as_default():
self.build()
pass
def build(self):
self.input_ph = tf.placeholder(
tf.float32,
[None, self.length_of_sequences, self.num_of_input_nodes],
name="input")
self.supervisor_ph = tf.placeholder(
tf.float32,
[None, self.length_of_sequences, self.num_of_output_nodes],
name="supervisor")
self.istate_ph = tf.placeholder(
tf.float32,
[None, self.num_of_hidden_nodes * 2],
name="istate")
self.output_op, self.states_op, self.data_op = self._inference(
self.input_ph, self.istate_ph)
self.loss_op = self._loss(self.output_op, self.supervisor_ph)
self.optimizer = tf.train.AdamOptimizer(
learning_rate=self.learning_rate)
# self.optimizer = tf.train.GradientDescentOptimizer(
# learning_rate=self.learning_rate)
self.training_op = self._training(self.loss_op)
self.summary_op = tf.summary.merge_all()
self.init = tf.initialize_all_variables()
def _inference(self, input_ph, istate_ph):
with tf.name_scope("inference") as scope:
weight1_var = tf.Variable(
tf.truncated_normal(
[self.num_of_input_nodes,
self.num_of_hidden_nodes],
stddev=0.1),
name="weight1")
weight2_var = tf.Variable(
tf.truncated_normal(
[self.num_of_hidden_nodes,
self.num_of_output_nodes],
stddev=0.1),
name="weight2")
bias1_var = tf.Variable(
tf.truncated_normal(
[self.num_of_hidden_nodes],
stddev=0.1),
name="bias1")
bias2_var = tf.Variable(
tf.truncated_normal(
[self.num_of_output_nodes],
stddev=0.1),
name="bias2")
in1 = tf.transpose(input_ph, [1, 0, 2])
in2 = tf.reshape(in1, [-1, self.num_of_input_nodes])
in3 = tf.matmul(in2, weight1_var) + bias1_var
in4 = tf.split(in3, self.length_of_sequences, 0)
cell = tf.nn.rnn_cell.BasicLSTMCell(
self.num_of_hidden_nodes,
forget_bias=forget_bias,
state_is_tuple=False)
rnn_output, states_op = tf.contrib.rnn.static_rnn(
cell, in4, initial_state=istate_ph)
seq_output_reshaped = tf.reshape(
rnn_output,
shape=[-1,
self.num_of_hidden_nodes],
name="seq_output_reshaped")
output_op = tf.matmul(seq_output_reshaped,
weight2_var) + bias2_var
# Add summary ops to collect data
w1_hist = tf.summary.histogram("weights1", weight1_var)
w2_hist = tf.summary.histogram("weights2", weight2_var)
b1_hist = tf.summary.histogram("biases1", bias1_var)
b2_hist = tf.summary.histogram("biases2", bias2_var)
output_hist = tf.summary.histogram("output", output_op)
results = [weight1_var, weight2_var, bias1_var, bias2_var]
return output_op, states_op, results
def _loss(self, output_op, supervisor_ph):
with tf.name_scope("loss") as scope:
supervisor_reshaped = tf.reshape(
supervisor_ph,
shape=[-1,
self.num_of_output_nodes],
name="supervisor_reshaped")
square_error = tf.reduce_mean(
tf.square(output_op - supervisor_reshaped))
loss_op = square_error
tf.summary.scalar("loss", loss_op)
return loss_op
def _training(self, loss_op):
with tf.name_scope("training") as scope:
training_op = self.optimizer.minimize(loss_op)
return training_op
def train(self, X, t):
with self.g.as_default():
with tf.Session() as sess:
saver = tf.train.Saver()
summary_writer = tf.summary.FileWriter(
"/tmp/tensorflow_log", graph=sess.graph)
sess.run(self.init)
for epoch in range(self.num_of_training_epochs):
inputs, supervisors = get_batch(
self.size_of_mini_batch, X, t)
train_dict = {
self.input_ph: inputs,
self.supervisor_ph: supervisors,
self.istate_ph: np.zeros((self.size_of_mini_batch, self.num_of_hidden_nodes * 2)),
}
sess.run(self.training_op, feed_dict=train_dict)
if (epoch) % 100 == 0:
summary_str, train_loss = sess.run(
[self.summary_op, self.loss_op],
feed_dict=train_dict)
print("train#%d, train loss: %e" % (epoch, train_loss))
summary_writer.add_summary(summary_str, epoch)
datas = sess.run(self.data_op)
self.save_ret = saver.save(sess, "./model.ckpt")
print(self.save_ret)
pass
def test(self, X, t):
with tf.Session(graph=self.g) as sess:
saver = tf.train.Saver()
saver.restore(sess, self.save_ret)
for epoch in range(self.num_of_prediction_epochs):
inputs, supervisors = get_batch(
self.size_of_mini_batch, X, t)
test_dict = {
self.input_ph: inputs,
self.supervisor_ph: supervisors,
self.istate_ph: np.zeros((self.size_of_mini_batch, self.num_of_hidden_nodes * 2)),
}
sample_output, sample_loss = sess.run(
[self.output_op, self.loss_op], feed_dict=test_dict)
sample_output_reshaped = sample_output.reshape(
self.size_of_mini_batch, self.length_of_sequences, self.num_of_output_nodes)
with open("./tmp.pkl", "wb") as f:
pickle.dump(
[inputs, supervisors, sample_output_reshaped, sample_loss], f)
# def _calc_accuracy(self, output_op, inputs, supervisors, sess, prints=False):
# print("_calc_accuracy is not activated.")
# pass
# pred_dict = {
# self.input_ph: inputs,
# self.supervisor_ph: supervisors,
# self.istate_ph: np.zeros(
# (self.num_of_prediction_epochs,
# self.num_of_hidden_nodes * 2)),
# }
# output = sess.run([output_op], feed_dict=pred_dict)
# def print_result(i, p, q):
# [print(list(x)[0]) for x in i]
# print("output: %f, correct: %d" % (p, q))
# if prints:
# [print_result(i, p, q)
# for i, p, q in zip(inputs, output[0], supervisors)]
# opt = abs(output - supervisors)[0]
# total = sum([1 if x[0] < 0.05 else 0 for x in opt])
# print("accuracy %f" % (total / float(len(supervisors))))
# return output
def main():
num_of_sample = 1000
length_of_sequences = 10
X, t = create_data(num_of_sample, length_of_sequences)
myrnn = my_RNN(num_of_input_nodes=1,
num_of_hidden_nodes=80,
num_of_output_nodes=1,
length_of_sequences=10,
num_of_training_epochs=5000,
size_of_mini_batch=100,
num_of_prediction_epochs=100,
learning_rate=0.01,
forget_bias=0.8,
num_of_sample=1000)
myrnn.train(X, t)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment