Last active
July 7, 2019 21:52
-
-
Save dsjt/fe7eace690a098516c1514264af2b7da to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from glob import iglob | |
from sample_RNN import my_RNN | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pickle | |
from sklearn.model_selection import train_test_split | |
def load_data(num_of_sample, length_of_sequences): | |
""" | |
引数 | |
num_of_sample: サンプルの個数 | |
length_of_sequences: 系列の長さ | |
返り値 | |
X: (num_of_sample, length_of_sequences)の配列 | |
t: (num_of_sample, length_of_sequences)の配列 | |
""" | |
X = [] | |
t = [] | |
for i, fn in enumerate(iglob("../work/F/*.pkl")): | |
if i >= num_of_sample: | |
break | |
with open(fn, "rb") as f: | |
x, f = pickle.load(f) | |
X.append(x.ravel()) | |
t.append(f.ravel()) | |
return X, t | |
batch_size = 2500 | |
length_of_sequences = 5 | |
num_of_sample = 3000 | |
num_of_hidden_nodes = 80 | |
num_of_training_epochs = 2000 | |
size_of_mini_batch = 1 | |
data_x, data_t = load_data(num_of_sample, "dummy") | |
# 各系列を2500でカット。 | |
reshaped_x, reshaped_t = [], [] | |
for x, t in zip(data_x, data_t): | |
reshaped_x.extend( | |
np.split(x[:batch_size], batch_size//length_of_sequences)) | |
reshaped_t.extend( | |
np.split(t[:batch_size], batch_size//length_of_sequences)) | |
X_train, X_test, y_train, y_test = train_test_split( | |
reshaped_x, reshaped_t, test_size=0.1, random_state=42) | |
print(len(X_train)) | |
print(len(X_test)) | |
myrnn = my_RNN(num_of_input_nodes=1, | |
num_of_hidden_nodes=num_of_hidden_nodes, | |
num_of_output_nodes=1, | |
length_of_sequences=length_of_sequences, | |
num_of_training_epochs=num_of_training_epochs, | |
size_of_mini_batch=size_of_mini_batch, | |
num_of_prediction_epochs=100, | |
learning_rate=0.01, | |
forget_bias=0.8, | |
num_of_sample=num_of_sample) | |
myrnn.train(X_train, y_train) | |
myrnn.test(X_test, y_test) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import matplotlib.pyplot as plt | |
with open("./tmp.pkl", "rb") as f: | |
inputs, supervisors, sample_output, sample_loss = pickle.load(f) | |
print(inputs.shape) | |
print(supervisors.shape) | |
print(sample_output.shape) | |
print(sample_loss) | |
fig, axes = plt.subplots(3, figsize=(8, 8)) | |
axes[0].plot(inputs[1].ravel()) | |
axes[0].set_title("教師入力") | |
axes[1].plot(supervisors[1].ravel()) | |
axes[1].set_title("教師出力") | |
axes[2].plot(supervisors[1].ravel(), label="答え") | |
axes[2].plot(sample_output[1].ravel(), label="サンプル") | |
axes[2].set_title("教師出力とNN出力の比較") | |
axes[2].legend() | |
print(sample_loss) | |
fig.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://github.com/yukiB/rnntest | |
import tensorflow as tf | |
import numpy as np | |
import random | |
import pickle | |
num_of_input_nodes = 1 | |
num_of_hidden_nodes = 80 | |
num_of_output_nodes = 1 | |
length_of_sequences = 10 | |
num_of_training_epochs = 5000 | |
size_of_mini_batch = 100 | |
num_of_prediction_epochs = 100 | |
learning_rate = 0.01 | |
forget_bias = 0.8 | |
num_of_sample = 1000 | |
def get_batch(batch_size, X, t): | |
""" | |
""" | |
rnum = [random.randint(0, len(X) - 1) for x in range(batch_size)] | |
xs = np.array([[[y] for y in X[r]] for r in rnum]) | |
ts = np.array([[[y] for y in t[r]] for r in rnum]) | |
return xs, ts | |
def create_data(nb_of_samples, sequence_len): | |
X = np.zeros((nb_of_samples, sequence_len)) | |
for row_idx in range(nb_of_samples): | |
X[row_idx, :] = np.around(np.random.rand(sequence_len)).astype(int) | |
# Create the targets for each sequence | |
t = np.sum(X, axis=1) | |
return X, t | |
def make_prediction(nb_of_samples): | |
sequence_len = 10 | |
xs, ts = create_data(nb_of_samples, sequence_len) | |
return np.array([[[y] for y in x] for x in xs]), np.array([[x] for x in ts]) | |
class my_RNN(object): | |
def __init__(self, | |
num_of_input_nodes=1, | |
num_of_hidden_nodes=80, | |
num_of_output_nodes=1, | |
length_of_sequences=10, | |
num_of_training_epochs=5000, | |
size_of_mini_batch=100, | |
num_of_prediction_epochs=100, | |
learning_rate=0.01, | |
forget_bias=0.8, | |
num_of_sample=1000): | |
self.num_of_input_nodes = num_of_input_nodes | |
self.num_of_hidden_nodes = num_of_hidden_nodes | |
self.num_of_output_nodes = num_of_output_nodes | |
self.length_of_sequences = length_of_sequences | |
self.num_of_training_epochs = num_of_training_epochs | |
self.size_of_mini_batch = size_of_mini_batch | |
self.num_of_prediction_epochs = num_of_prediction_epochs | |
self.learning_rate = learning_rate | |
self.forget_bias = forget_bias | |
self.num_of_sample = num_of_sample | |
# 乱数の初期化 | |
random.seed(123) | |
np.random.seed(123) | |
tf.set_random_seed(123) | |
self.g = tf.Graph() | |
with self.g.as_default(): | |
self.build() | |
pass | |
def build(self): | |
self.input_ph = tf.placeholder( | |
tf.float32, | |
[None, self.length_of_sequences, self.num_of_input_nodes], | |
name="input") | |
self.supervisor_ph = tf.placeholder( | |
tf.float32, | |
[None, self.length_of_sequences, self.num_of_output_nodes], | |
name="supervisor") | |
self.istate_ph = tf.placeholder( | |
tf.float32, | |
[None, self.num_of_hidden_nodes * 2], | |
name="istate") | |
self.output_op, self.states_op, self.data_op = self._inference( | |
self.input_ph, self.istate_ph) | |
self.loss_op = self._loss(self.output_op, self.supervisor_ph) | |
self.optimizer = tf.train.AdamOptimizer( | |
learning_rate=self.learning_rate) | |
# self.optimizer = tf.train.GradientDescentOptimizer( | |
# learning_rate=self.learning_rate) | |
self.training_op = self._training(self.loss_op) | |
self.summary_op = tf.summary.merge_all() | |
self.init = tf.initialize_all_variables() | |
def _inference(self, input_ph, istate_ph): | |
with tf.name_scope("inference") as scope: | |
weight1_var = tf.Variable( | |
tf.truncated_normal( | |
[self.num_of_input_nodes, | |
self.num_of_hidden_nodes], | |
stddev=0.1), | |
name="weight1") | |
weight2_var = tf.Variable( | |
tf.truncated_normal( | |
[self.num_of_hidden_nodes, | |
self.num_of_output_nodes], | |
stddev=0.1), | |
name="weight2") | |
bias1_var = tf.Variable( | |
tf.truncated_normal( | |
[self.num_of_hidden_nodes], | |
stddev=0.1), | |
name="bias1") | |
bias2_var = tf.Variable( | |
tf.truncated_normal( | |
[self.num_of_output_nodes], | |
stddev=0.1), | |
name="bias2") | |
in1 = tf.transpose(input_ph, [1, 0, 2]) | |
in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) | |
in3 = tf.matmul(in2, weight1_var) + bias1_var | |
in4 = tf.split(in3, self.length_of_sequences, 0) | |
cell = tf.nn.rnn_cell.BasicLSTMCell( | |
self.num_of_hidden_nodes, | |
forget_bias=forget_bias, | |
state_is_tuple=False) | |
rnn_output, states_op = tf.contrib.rnn.static_rnn( | |
cell, in4, initial_state=istate_ph) | |
seq_output_reshaped = tf.reshape( | |
rnn_output, | |
shape=[-1, | |
self.num_of_hidden_nodes], | |
name="seq_output_reshaped") | |
output_op = tf.matmul(seq_output_reshaped, | |
weight2_var) + bias2_var | |
# Add summary ops to collect data | |
w1_hist = tf.summary.histogram("weights1", weight1_var) | |
w2_hist = tf.summary.histogram("weights2", weight2_var) | |
b1_hist = tf.summary.histogram("biases1", bias1_var) | |
b2_hist = tf.summary.histogram("biases2", bias2_var) | |
output_hist = tf.summary.histogram("output", output_op) | |
results = [weight1_var, weight2_var, bias1_var, bias2_var] | |
return output_op, states_op, results | |
def _loss(self, output_op, supervisor_ph): | |
with tf.name_scope("loss") as scope: | |
supervisor_reshaped = tf.reshape( | |
supervisor_ph, | |
shape=[-1, | |
self.num_of_output_nodes], | |
name="supervisor_reshaped") | |
square_error = tf.reduce_mean( | |
tf.square(output_op - supervisor_reshaped)) | |
loss_op = square_error | |
tf.summary.scalar("loss", loss_op) | |
return loss_op | |
def _training(self, loss_op): | |
with tf.name_scope("training") as scope: | |
training_op = self.optimizer.minimize(loss_op) | |
return training_op | |
def train(self, X, t): | |
with self.g.as_default(): | |
with tf.Session() as sess: | |
saver = tf.train.Saver() | |
summary_writer = tf.summary.FileWriter( | |
"/tmp/tensorflow_log", graph=sess.graph) | |
sess.run(self.init) | |
for epoch in range(self.num_of_training_epochs): | |
inputs, supervisors = get_batch( | |
self.size_of_mini_batch, X, t) | |
train_dict = { | |
self.input_ph: inputs, | |
self.supervisor_ph: supervisors, | |
self.istate_ph: np.zeros((self.size_of_mini_batch, self.num_of_hidden_nodes * 2)), | |
} | |
sess.run(self.training_op, feed_dict=train_dict) | |
if (epoch) % 100 == 0: | |
summary_str, train_loss = sess.run( | |
[self.summary_op, self.loss_op], | |
feed_dict=train_dict) | |
print("train#%d, train loss: %e" % (epoch, train_loss)) | |
summary_writer.add_summary(summary_str, epoch) | |
datas = sess.run(self.data_op) | |
self.save_ret = saver.save(sess, "./model.ckpt") | |
print(self.save_ret) | |
pass | |
def test(self, X, t): | |
with tf.Session(graph=self.g) as sess: | |
saver = tf.train.Saver() | |
saver.restore(sess, self.save_ret) | |
for epoch in range(self.num_of_prediction_epochs): | |
inputs, supervisors = get_batch( | |
self.size_of_mini_batch, X, t) | |
test_dict = { | |
self.input_ph: inputs, | |
self.supervisor_ph: supervisors, | |
self.istate_ph: np.zeros((self.size_of_mini_batch, self.num_of_hidden_nodes * 2)), | |
} | |
sample_output, sample_loss = sess.run( | |
[self.output_op, self.loss_op], feed_dict=test_dict) | |
sample_output_reshaped = sample_output.reshape( | |
self.size_of_mini_batch, self.length_of_sequences, self.num_of_output_nodes) | |
with open("./tmp.pkl", "wb") as f: | |
pickle.dump( | |
[inputs, supervisors, sample_output_reshaped, sample_loss], f) | |
# def _calc_accuracy(self, output_op, inputs, supervisors, sess, prints=False): | |
# print("_calc_accuracy is not activated.") | |
# pass | |
# pred_dict = { | |
# self.input_ph: inputs, | |
# self.supervisor_ph: supervisors, | |
# self.istate_ph: np.zeros( | |
# (self.num_of_prediction_epochs, | |
# self.num_of_hidden_nodes * 2)), | |
# } | |
# output = sess.run([output_op], feed_dict=pred_dict) | |
# def print_result(i, p, q): | |
# [print(list(x)[0]) for x in i] | |
# print("output: %f, correct: %d" % (p, q)) | |
# if prints: | |
# [print_result(i, p, q) | |
# for i, p, q in zip(inputs, output[0], supervisors)] | |
# opt = abs(output - supervisors)[0] | |
# total = sum([1 if x[0] < 0.05 else 0 for x in opt]) | |
# print("accuracy %f" % (total / float(len(supervisors)))) | |
# return output | |
def main(): | |
num_of_sample = 1000 | |
length_of_sequences = 10 | |
X, t = create_data(num_of_sample, length_of_sequences) | |
myrnn = my_RNN(num_of_input_nodes=1, | |
num_of_hidden_nodes=80, | |
num_of_output_nodes=1, | |
length_of_sequences=10, | |
num_of_training_epochs=5000, | |
size_of_mini_batch=100, | |
num_of_prediction_epochs=100, | |
learning_rate=0.01, | |
forget_bias=0.8, | |
num_of_sample=1000) | |
myrnn.train(X, t) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment