Last active
December 9, 2015 10:20
-
-
Save aidiary/a9c2e9ab48de446df577 to your computer and use it in GitHub Desktop.
Theanoによる自己符号化器の実装
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding: utf-8 | |
import time | |
import numpy as np | |
import theano | |
import theano.tensor as T | |
from theano.tensor.shared_randomstreams import RandomStreams | |
import gzip | |
import cPickle | |
class Autoencoder(object): | |
def __init__(self, numpy_rng, theano_rng=None, | |
input=None, | |
n_visible=784, n_hidden=500, | |
W=None, bhid=None, bvis=None): | |
self.n_visible = n_visible | |
self.n_hidden = n_hidden | |
if not theano_rng: | |
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30)) | |
if not W: | |
# 入力層と出力層の間の重み | |
initial_W = np.asarray( | |
numpy_rng.uniform( | |
low=-4 * np.sqrt(6.0 / (n_hidden + n_visible)), | |
high=4 * np.sqrt(6.0 / (n_hidden + n_visible)), | |
size=(n_visible, n_hidden) | |
), | |
dtype=theano.config.floatX | |
) | |
W = theano.shared(value=initial_W, name='W', borrow=True) | |
if not bvis: | |
# 入力層(visible)のユニットのバイアス | |
bvis = theano.shared( | |
value=np.zeros(n_visible, dtype=theano.config.floatX), | |
borrow=True) | |
if not bhid: | |
# 隠れ層(hidden)のユニットのバイアス | |
bhid = theano.shared( | |
value=np.zeros(n_hidden, dtype=theano.config.floatX), | |
name='b', | |
borrow=True) | |
# パラメータ | |
self.W = W | |
self.b = bhid | |
self.W_prime = self.W.T | |
self.b_prime = bvis | |
self.params = [self.W, self.b, self.b_prime] | |
self.theano_rng = theano_rng | |
if input is None: | |
self.x = T.dmatrix(name='input') | |
else: | |
self.x = input | |
def get_hidden_values(self, input): | |
"""入力層の値を隠れ層の値に変換""" | |
return T.nnet.sigmoid(T.dot(input, self.W) + self.b) | |
def get_reconstructed_input(self, hidden): | |
"""隠れ層の値を入力層の値に逆変換""" | |
return T.nnet.sigmoid(T.dot(hidden, self.W_prime) + self.b_prime) | |
def get_cost_updates(self, learning_rate): | |
"""コスト関数と更新式のシンボルを返す""" | |
# 入力を変換 | |
y = self.get_hidden_values(self.x) | |
# 変換した値を逆変換で入力に戻す | |
z = self.get_reconstructed_input(y) | |
# コスト関数のシンボル | |
# 元の入力と再構築した入力の交差エントロピー誤差を計算 | |
# 入力xがミニバッチのときLはベクトルになる | |
L = - T.sum(self.x * T.log(z) + (1 - self.x) * T.log(1 - z), axis=1) | |
# Lはミニバッチの各サンプルの交差エントロピー誤差なので全サンプルで平均を取る | |
cost = T.mean(L) | |
# cost += 0.001 * abs(self.W).sum() # L1 regularization | |
# cost += 0.001 * (self.W ** 2).sum() # L2 regularization | |
# 誤差関数の微分 | |
gparams = T.grad(cost, self.params) | |
# 更新式のシンボル | |
updates = [(param, param - learning_rate * gparam) | |
for param, gparam in zip(self.params, gparams)] | |
return cost, updates | |
def feedforward(self): | |
"""入力をフィードフォワードさせて出力を計算""" | |
y = self.get_hidden_values(self.x) | |
z = self.get_reconstructed_input(y) | |
return z | |
def __getstate__(self): | |
"""パラメータの状態を返す""" | |
return (self.W.get_value(), self.b.get_value(), self.b_prime.get_value()) | |
def __setstate__(self, state): | |
"""パラメータの状態をセット""" | |
self.W.set_value(state[0]) | |
self.b.set_value(state[1]) | |
self.b_prime.set_value(state[2]) | |
def load_data(dataset): | |
"""データセットをロードしてGPUの共有変数に格納""" | |
f = gzip.open(dataset, 'rb') | |
train_set, valid_set, test_set = cPickle.load(f) | |
f.close() | |
def shared_dataset(data_xy, borrow=True): | |
data_x, data_y = data_xy | |
# 共有変数には必ずfloat型で格納 | |
shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow) | |
shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow) | |
# ラベルはint型なのでキャストして返す | |
return shared_x, T.cast(shared_y, 'int32') | |
test_set_x, test_set_y = shared_dataset(test_set) | |
valid_set_x, valid_set_y = shared_dataset(valid_set) | |
train_set_x, train_set_y = shared_dataset(train_set) | |
rval = [(train_set_x, train_set_y), | |
(valid_set_x, valid_set_y), | |
(test_set_x, test_set_y)] | |
return rval | |
if __name__ == "__main__": | |
learning_rate = 0.1 | |
training_epochs = 20 | |
batch_size = 20 | |
# 学習データのロード | |
datasets = load_data('mnist.pkl.gz') | |
# 自己符号化器は教師なし学習なので訓練データのラベルは使わない | |
train_set_x = datasets[0][0] | |
# ミニバッチ数 | |
n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size | |
# ミニバッチのインデックスを表すシンボル | |
index = T.lscalar() | |
# ミニバッチの学習データを表すシンボル | |
x = T.matrix('x') | |
# モデル構築 | |
rng = np.random.RandomState(123) | |
theano_rng = RandomStreams(rng.randint(2 ** 30)) | |
autoencoder = Autoencoder(numpy_rng=rng, | |
theano_rng=theano_rng, | |
input=x, | |
n_visible=28 * 28, | |
n_hidden=500) | |
# コスト関数と更新式のシンボルを取得 | |
cost, updates = autoencoder.get_cost_updates(learning_rate=learning_rate) | |
# 訓練用の関数を定義 | |
train_da = theano.function([index], | |
cost, | |
updates=updates, | |
givens={ | |
x: train_set_x[index * batch_size: (index + 1) * batch_size] | |
}) | |
# モデル訓練 | |
fp = open("cost.txt", "w") | |
start_time = time.clock() | |
for epoch in xrange(training_epochs): | |
c = [] | |
for batch_index in xrange(n_train_batches): | |
c.append(train_da(batch_index)) | |
print "Training epoch %d, cost %f" % (epoch, np.mean(c)) | |
fp.write("%d\t%f\n" % (epoch, np.mean(c))) | |
fp.flush() | |
end_time = time.clock() | |
training_time = (end_time - start_time) | |
fp.close() | |
print "time: %ds" % (training_time) | |
# 学習したモデルの状態を保存 | |
f = open('autoencoder.pkl', 'wb') | |
cPickle.dump(autoencoder.__getstate__(), f, protocol=cPickle.HIGHEST_PROTOCOL) | |
f.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment