Skip to content

Instantly share code, notes, and snippets.

@hassaku
Last active June 16, 2017 01:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hassaku/54a7b3d5213084527dc45e913302da20 to your computer and use it in GitHub Desktop.
Save hassaku/54a7b3d5213084527dc45e913302da20 to your computer and use it in GitHub Desktop.
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
import argparse
import numpy as np
from scipy.stats import norm
import tensorflow as tf
import matplotlib.pyplot as plt
seed = 42
np.random.seed(seed)
tf.set_random_seed(seed)
def linear_nn(input, output_dim, scope=None, stddev=1.0):
norm = tf.random_normal_initializer(stddev=stddev)
const = tf.constant_initializer(0.0)
with tf.variable_scope(scope or 'linear_nn'):
w = tf.get_variable('w', [input.get_shape()[1], output_dim], initializer=norm)
b = tf.get_variable('b', [output_dim], initializer=const)
return tf.matmul(input, w) + b
def generator(input, h_dim, out_dim):
h0 = tf.nn.softplus(linear_nn(input, h_dim, 'g0'))
h1 = linear_nn(h0, out_dim, 'g1')
return h1
def discriminator(input, h_dim):
h0 = tf.tanh(linear_nn(input, h_dim * 2, 'd0'))
h1 = tf.tanh(linear_nn(h0, h_dim * 2, 'd1'))
h2 = tf.tanh(linear_nn(h1, h_dim * 2, scope='d2'))
h3 = tf.sigmoid(linear_nn(h2, 1, scope='d3'))
return h3
def optimizer(loss, var_list, initial_learning_rate):
decay = 0.95
num_decay_steps = 150
batch = tf.Variable(0)
learning_rate = tf.train.exponential_decay(
initial_learning_rate,
batch,
num_decay_steps,
decay,
staircase=True
)
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(
loss,
global_step=batch,
var_list=var_list
)
return optimizer
class GAN(object):
def __init__(self, data, gen, dims, num_steps, batch_size, log_every):
self.data = data
self.gen = gen
self.dims = dims
self.num_steps = num_steps
self.batch_size = batch_size
self.log_every = log_every
self.mlp_hidden_size = 4
self.learning_rate = 0.03
self._create_model()
def _create_model(self):
# 識別器の事前学習モデル。学習がうまくいくように、最尤推定でデータを学習しておき、それを学習時にコピー
with tf.variable_scope('D_pre'):
self.pre_input = tf.placeholder(tf.float32, shape=(self.batch_size, self.dims))
self.pre_labels = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
D_pre = discriminator(self.pre_input, self.mlp_hidden_size)
self.pre_loss = tf.reduce_mean(tf.square(D_pre - self.pre_labels))
self.pre_opt = optimizer(self.pre_loss, None, self.learning_rate)
# 生成器
with tf.variable_scope('Gen'):
self.z = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
self.G = generator(self.z, self.mlp_hidden_size, self.dims)
# 識別器
"""
z -> G -> D2
x -------> D1
"""
with tf.variable_scope('Disc') as scope:
self.x = tf.placeholder(tf.float32, shape=(self.batch_size, self.dims))
self.D1 = discriminator(self.x, self.mlp_hidden_size) # D1: 実データを入力したときの識別器の出力
scope.reuse_variables() # 識別器は共通
self.D2 = discriminator(self.G, self.mlp_hidden_size) # D2: 乱数zから生成器で生成されるデータを入力したときの識別機の出力
# 学習の定義(ミニマックス最適化)
self.loss_d = tf.reduce_mean(-tf.log(self.D1) - tf.log(1 - self.D2)) # 識別器は学習データが1,生成データが0になるように学習
self.loss_g = tf.reduce_mean(-tf.log(self.D2)) # 生成器は識別器が生成データに対し1を出力する(実データと思わせる)ように学習
self.d_pre_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='D_pre')
self.d_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='Disc')
self.g_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='Gen')
self.opt_d = optimizer(self.loss_d, self.d_params, self.learning_rate)
self.opt_g = optimizer(self.loss_g, self.g_params, self.learning_rate)
def train(self):
loss_history_d = []
loss_history_g = []
plt.figure(figsize=(14, 4))
with tf.Session() as session:
tf.global_variables_initializer().run()
if False:
# 識別器の事前学習(密度関数を学習)
num_pretrain_steps = 1000
for step in xrange(num_pretrain_steps):
# TODO
d = (np.random.random(self.batch_size) - 0.5) * 10.0
labels = norm.pdf(d, loc=self.data.mu, scale=self.data.sigma)
pretrain_loss, _ = session.run([self.pre_loss, self.pre_opt], {
self.pre_input: np.reshape(d, (self.batch_size, self.dims)),
self.pre_labels: np.reshape(labels, (self.batch_size, 1))
})
self.weightsD = session.run(self.d_pre_params)
for i, v in enumerate(self.d_params):
session.run(v.assign(self.weightsD[i]))
self._plot_distributions(session, 'init')
# GAN学習
for step in xrange(self.num_steps):
# update discriminator
x = self.data.sample(self.batch_size)
z = self.gen.sample(self.batch_size)
loss_d, _ = session.run([self.loss_d, self.opt_d], {
self.x: np.reshape(x, (self.batch_size, self.dims)),
self.z: np.reshape(z, (self.batch_size, 1))
})
# update generator
z = self.gen.sample(self.batch_size)
loss_g, _ = session.run([self.loss_g, self.opt_g], {
self.z: np.reshape(z, (self.batch_size, 1))
})
loss_history_d.append(loss_d)
loss_history_g.append(loss_g)
if step % self.log_every == 0:
print('{step}: loss-disc={loss_disc}, loss-gen={loss_gen}'.format(step=step, loss_disc=loss_d, loss_gen=loss_g))
self._plot_distributions(session)
# 学習終了
self._plot_distributions(session, 'finished')
if self.dims == 1:
plt.ylim([0, 1])
plt.title('1D Generative Adversarial Network')
plt.xlabel('Data values')
plt.ylabel('Probability density')
leg = plt.legend(fancybox=True)
leg.get_frame().set_alpha(0.5)
plt.show()
self._plot_loss_curve(loss_history_d, loss_history_g)
def _plot_distributions(self, session, status=None):
if self.dims != 1:
return
db, pd, pg = self._samples(session)
db_x = np.linspace(-self.gen.range, self.gen.range, len(db))
p_x = np.linspace(-self.gen.range, self.gen.range, len(pd))
if status == 'finished':
plt.plot(db_x, db, 'r', linewidth=3, label='decision boundary')
plt.plot(p_x, pd, 'g', linewidth=3, label='real data')
plt.plot(p_x, pg, 'b', linewidth=3, label='generated data')
elif status == 'init':
plt.plot(db_x, db, 'r--', label='initial decision boundary')
plt.plot(p_x, pg, 'b--', label='initial generated data')
else:
plt.plot(db_x, db, 'r', alpha=0.4)
plt.plot(p_x, pg, 'b', alpha=0.4)
def _plot_loss_curve(self, loss_history_d, loss_history_g):
plt.figure(figsize=(14, 4))
plt.subplot(1, 2, 1)
plt.plot(loss_history_d, 'r')
plt.title('loss of discriminator')
plt.xlim([0, len(loss_history_d)])
plt.subplot(1, 2, 2)
plt.plot(loss_history_g, 'b')
plt.title('loss of generator')
plt.xlim([0, len(loss_history_g)])
plt.show()
def _samples(self, session, num_points=10000, num_bins=100):
xs = np.linspace(-self.gen.range, self.gen.range, num_points)
bins = np.linspace(-self.gen.range, self.gen.range, num_bins)
# decision boundary
db = np.zeros((num_points, 1))
for i in range(num_points // self.batch_size):
db[self.batch_size * i:self.batch_size * (i + 1)] = session.run(self.D1, {
self.x: np.reshape(
xs[self.batch_size * i:self.batch_size * (i + 1)],
(self.batch_size, 1)
)
})
# data distribution
d = self.data.sample(num_points)
pd, _ = np.histogram(d, bins=bins, density=True)
# generated samples
zs = np.linspace(-self.gen.range, self.gen.range, num_points)
g = np.zeros((num_points, 1))
for i in range(num_points // self.batch_size):
g[self.batch_size * i:self.batch_size * (i + 1)] = session.run(self.G, {
self.z: np.reshape(
zs[self.batch_size * i:self.batch_size * (i + 1)],
(self.batch_size, 1)
)
})
pg, _ = np.histogram(g, bins=bins, density=True)
return db, pd, pg
def generate_data(self, N, range=8):
with tf.Session() as session:
tf.global_variables_initializer().run()
z = np.linspace(-range, range, N) + np.random.random(N) * 0.01
return session.run(self.G, { self.z: z.reshape((self.batch_size, 1)) })
class DataDistribution(object):
def __init__(self):
self.mu = 4
self.sigma = 0.5
def sample(self, N):
samples = np.random.normal(self.mu, self.sigma, N)
samples.sort()
return samples
class GeneratorDistribution(object):
def __init__(self, range):
self.range = range
def sample(self, N):
return np.linspace(-self.range, self.range, N) + np.random.random(N) * 0.01
tf.reset_default_graph()
# 一様な乱数を入力として、真の分布に近いような分布を生成する生成器を学習する
model = GAN(
data=DataDistribution(),
gen=GeneratorDistribution(range=8),
dims=1,
num_steps=2000,
batch_size=12,
log_every=100
)
model.train()
@hassaku
Copy link
Author

hassaku commented Apr 5, 2017

pasted image at 2017_04_04 05_10 pm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment