Created
April 13, 2017 06:54
-
-
Save nicewook/57939132fbc5833ba3f977da3cd41e8c to your computer and use it in GitHub Desktop.
mdn toy code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 원문링크: http://blog.otoro.net/2015/11/24/mixture-density-networks-with-tensorflow/ | |
############################################################## | |
# 라이브러리 import | |
############################################################## | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import tensorflow as tf | |
import math | |
############################################################## | |
# 우리가 목표로 할 그래프 생성 | |
############################################################## | |
NSAMPLE = 1000 | |
x_data = np.float32(np.random.uniform(-10.5, 10.5, (1, NSAMPLE))).T | |
r_data = np.float32(np.random.normal(size=(NSAMPLE,1))) # 랜덤 노이즈이다 | |
y_data = np.float32(np.sin(0.75*x_data)*7.0+x_data*0.5+r_data*1.0) | |
plt.figure(figsize=(8, 8)) | |
plot_out = plt.plot(x_data,y_data,'ro',alpha=0.3) | |
plt.show() | |
############################################################## | |
# x 입력에 정답 출력은 y 이다 | |
############################################################## | |
x = tf.placeholder(dtype=tf.float32, shape=[None,1]) | |
y = tf.placeholder(dtype=tf.float32, shape=[None,1]) | |
############################################################## | |
# 히든 레이어 만들기 | |
# 히든 레이어의 노드는 20개 | |
############################################################## | |
NHIDDEN = 20 | |
W = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=1.0, dtype=tf.float32)) | |
b = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=1.0, dtype=tf.float32)) | |
W_out = tf.Variable(tf.random_normal([NHIDDEN,1], stddev=1.0, dtype=tf.float32)) | |
b_out = tf.Variable(tf.random_normal([1,1], stddev=1.0, dtype=tf.float32)) | |
# [None, 1] 과 [1, NHIDDEN] 을 행렬곱했으니 출력은 [None, NHIDDEN] | |
hidden_layer = tf.nn.tanh(tf.matmul(x, W) + b) | |
# [None, NHIDDEN] 과 [NHIDDEN, 1]을 행렬곱했으니 출력은 [None, 1] | |
# 즉 입력되는 x의 개수만큼 출력이 된다 | |
y_out = tf.matmul(hidden_layer,W_out) + b_out | |
############################################################## | |
# loss function 정의하기 | |
# tf.nn.l2_loss() 참고 https://www.tensorflow.org/api_docs/python/tf/nn/l2_loss | |
# Computes half the L2 norm of a tensor without the sqrt: 루트는 안씌운 값임 | |
# output = sum(t ** 2) / 2 | |
############################################################## | |
lossfunc = tf.nn.l2_loss(y_out-y) | |
############################################################## | |
# weight update | |
############################################################## | |
train_op = tf.train.RMSPropOptimizer(learning_rate=0.1, decay=0.8).minimize(lossfunc) | |
############################################################## | |
# 위쪽은 그래프 만드는 것 | |
# 여기서부터 session run | |
############################################################## | |
with tf.Session() as sess: | |
sess.run(tf.global_variables_initializer()) | |
# 1000번 돌린다 | |
NEPOCH = 1000 | |
for i in range(NEPOCH): | |
sess.run(train_op, feed_dict={x: x_data, y: y_data}) | |
# 테스트 | |
x_test = np.float32(np.arange(-10.5, 10.5, 0.1)) | |
print(x_test.shape) | |
x_test = x_test.reshape(x_test.size, 1) | |
y_test = sess.run(y_out, feed_dict={x: x_test}) | |
plt.figure(figsize=(8, 8)) | |
# 8인치, 8인치 https://www.labri.fr/perso/nrougier/teaching/matplotlib/#figures-subplots-axes-and-ticks | |
# 전체 화면의 크기를 말한다고 보면 된다. 뭐 중요한 건 아닌듯 | |
plt.plot(x_data, y_data, 'ro', x_test, y_test, 'bo', alpha=0.3) | |
plt.show() | |
# 표를 보면 잘 맞는다. | |
# 문제는 이런 방식은 x, y가 1대1 또는 다대 1 (one to one, many to one) 일때만 잘 동작한다는 것 | |
############################################################## | |
# 우리가 그린 그래프의 x, y를 뒤집어보자 | |
############################################################## | |
temp_data = x_data | |
x_data = y_data | |
y_data = temp_data | |
plt.figure(figsize=(8, 8)) | |
plot_out = plt.plot(x_data,y_data,'ro',alpha=0.3) | |
plt.show() | |
############################################################## | |
# 이 경우에는 MSE, 즉 tf.nn.l2_loss() 로 하면 답이 안나온다. | |
############################################################## | |
with tf.Session() as sess: | |
sess.run(tf.global_variables_initializer()) | |
# 1000번 돌린다 | |
NEPOCH = 1000 | |
for i in range(NEPOCH): | |
sess.run(train_op, feed_dict={x: x_data, y: y_data}) | |
# 테스트 | |
x_test = np.float32(np.arange(-10.5, 10.5, 0.1)) | |
print(x_test.shape) | |
x_test = x_test.reshape(x_test.size, 1) | |
y_test = sess.run(y_out, feed_dict={x: x_test}) | |
plt.figure(figsize=(8, 8)) | |
# 8인치, 8인치 https://www.labri.fr/perso/nrougier/teaching/matplotlib/#figures-subplots-axes-and-ticks | |
# 전체 화면의 크기를 말한다고 보면 된다. 뭐 중요한 건 아닌듯 | |
plt.plot(x_data, y_data, 'ro', x_test, y_test, 'bo', alpha=0.3) | |
plt.show() | |
############################################################## | |
# 하나의 입력에 여러 출력개인걸 MSE로 풀어보려 하니 | |
# 예측이 엉망이 되어버렸다 | |
# | |
# 이제 MDN - Mixtured Density Networks를 보자 | |
# 하나의 입력값에 대해 여러 출력값을 예측해준다 | |
############################################################## | |
# MDN은 하나의 출력을 예측하기 보다 | |
# 여러 출력의 확률적 분포를 예측해보자는 것이다. | |
# x = 1일때 y = 1인 경우가 많고, y = 3 인 경우가 적다면 | |
# 확률적으로 1일 경우 90%, 3일 경우 10% 이런식의 예측이 가능하다는 것이다 | |
# 로봇팔의 위치, 손글씨의 다음 위치등에서 이용할 수 있다. | |
############################################################## | |
# MDN 그래프 | |
############################################################## | |
NHIDDEN = 24 | |
STDEV = 0.5 | |
KMIX = 24 # number of mixtures | |
NOUT = KMIX * 3 # pi, mu, stdev | |
x = tf.placeholder(dtype=tf.float32, shape=[None,1], name="x") | |
y = tf.placeholder(dtype=tf.float32, shape=[None,1], name="y") | |
Wh = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=STDEV, dtype=tf.float32)) | |
bh = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=STDEV, dtype=tf.float32)) | |
Wo = tf.Variable(tf.random_normal([NHIDDEN,NOUT], stddev=STDEV, dtype=tf.float32)) | |
bo = tf.Variable(tf.random_normal([1,NOUT], stddev=STDEV, dtype=tf.float32)) | |
# [None, 1] 과 [1, NHIDDEN] 을 행렬곱했으니 출력은 [None, NHIDDEN] | |
hidden_layer = tf.nn.tanh(tf.matmul(x, Wh) + bh) | |
# [None, NHIDDEN] 과 [NHIDDEN, NOUT]을 행렬곱했으니 출력은 [None, NOUT] | |
# 즉 (입력되는 x의 개수, 72) 가 된다 | |
output = tf.matmul(hidden_layer,Wo) + bo | |
############################################################## | |
# probability distribution function을 계산하기 위한 값들을 | |
# 네크웍의 출력인 output에서 연산을 통해 뽑아낸다 | |
############################################################## | |
def get_mixture_coef(output): | |
# 우선 output에서 24개씩 잘라내어 넣을 변수넣을 수 있도록 마련해둔다 KMIX = 24이다 | |
out_pi = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam") | |
out_sigma = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam") | |
out_mu = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam") | |
# 잘라내서 각각 넣어준다 | |
# tf.split() 함수 https://www.tensorflow.org/api_docs/python/tf/split | |
# 아래의 3은 3등분, output은 3등분할 녀석(tf 1.0에서는 파라미터 순서가 다를 수 있다. | |
# out_pi, out_sigma, out_mu = tf.split(1, 3, output) # 옛날버전 | |
out_pi, out_sigma, out_mu = tf.split(output, 3, 1 ) # 요즘버전 | |
# https://www.tensorflow.org/api_docs/python/tf/reduce_max | |
# axis=1 이고 입력 out_pi (shape는 None, 24) 이다. | |
# keep_dims=True이므로 해당 rank가 1로 줄어든다 | |
# 즉 24개중 가장 큰 값을 뽑아내는 것으로 보인다. 결과인 map_pi는 그러면 (None, 1) 이 될것으로 보인다 | |
max_pi = tf.reduce_max(out_pi, 1, keep_dims=True) | |
out_pi = tf.subtract(out_pi, max_pi) # 뺄셈. 결국 각각의 값에 대해 최대값을 빼주는 것이다. 아래 exp 연산 폭주 예방 | |
out_pi = tf.exp(out_pi) | |
# https: // www.tensorflow.org / api_docs / python / tf / reciprocal | |
#normalize_pi = tf.inv(tf.reduce_sum(out_pi, 1, keep_dims=True)) # 옛날 버전 | |
normalize_pi = tf.reciprocal(tf.reduce_sum(out_pi, 1, keep_dims=True)) # 24개의 합을 낸 뒤 분수로 만든다 y = 1/x | |
out_pi = tf.multiply(normalize_pi, out_pi) # 각각의 값 / 전체값 = 각각의 확률값 | |
out_sigma = tf.exp(out_sigma) # 시그마 = 편차에 exp 먹임 | |
return out_pi, out_sigma, out_mu # mu는 그냥 내보낸다 | |
out_pi, out_sigma, out_mu = get_mixture_coef(output) | |
############################################################## | |
# 아까처럼 x, y가 바뀐 녀석으로 train set을 만들어보자 | |
# 샘플을 좀 더 많이 넣음 = 2500개 | |
############################################################## | |
NSAMPLE = 2500 | |
y_data = np.float32(np.random.uniform(-10.5, 10.5, (1, NSAMPLE))).T | |
r_data = np.float32(np.random.normal(size=(NSAMPLE,1))) # random noise | |
x_data = np.float32(np.sin(0.75*y_data)*7.0+y_data*0.5+r_data*1.0) | |
plt.figure(figsize=(8, 8)) | |
plt.plot(x_data,y_data,'ro', alpha=0.3) | |
plt.show() | |
############################################################## | |
# 이번에는 l2 loss function을 쓸 수 없다. | |
# logarithm of the likelihood 를 쓸 것이다. | |
############################################################## | |
oneDivSqrtTwoPI = 1 / math.sqrt(2*math.pi) # normalisation factor for gaussian, not needed. 루트 2파이분에 1 | |
def tf_normal(y, mu, sigma): | |
result = tf.subtract(y, mu) | |
result = tf.multiply(result, tf.reciprocal(sigma)) | |
result = -tf.square(result)/2 | |
return tf.multiply(tf.exp(result), tf.reciprocal(sigma)) * oneDivSqrtTwoPI | |
def get_lossfunc(out_pi, out_sigma, out_mu, y): | |
result = tf_normal(y, out_mu, out_sigma) # 1) 여기서 위에 함수를 부른다 | |
result = tf.multiply(result, out_pi) | |
result = tf.reduce_sum(result, 1, keep_dims=True) | |
result = -tf.log(result) | |
return tf.reduce_mean(result) | |
lossfunc = get_lossfunc(out_pi, out_sigma, out_mu, y) | |
train_op = tf.train.AdamOptimizer().minimize(lossfunc) | |
############################################################## | |
# 이제 MDN을 실제로 돌려보자 | |
############################################################## | |
with tf.Session() as sess: | |
sess.run(tf.global_variables_initializer()) | |
NEPOCH = 10000 | |
loss = np.zeros(NEPOCH) # store the training progress here. | |
for i in range(NEPOCH): | |
sess.run(train_op,feed_dict={x: x_data, y: y_data}) | |
loss[i] = sess.run(lossfunc, feed_dict={x: x_data, y: y_data}) | |
############################################################## | |
# loss를 그려보자 | |
############################################################## | |
plt.figure(figsize=(8, 8)) | |
plt.plot(np.arange(100, NEPOCH,1), loss[100:], 'r-') | |
plt.show() | |
############################################################## | |
# 이제 실제 뽑아내는 부분 | |
############################################################## | |
x_test = np.float32(np.arange(-15,15,0.1)) | |
NTEST = x_test.size | |
x_test = x_test.reshape(NTEST,1) # needs to be a matrix, not a vector | |
def get_pi_idx(x, pdf): | |
N = pdf.size | |
accumulate = 0 | |
for i in range(0, N): | |
accumulate += pdf[i] | |
if accumulate >= x: | |
return i | |
print ('error with sampling ensemble') | |
return -1 | |
def generate_ensemble(out_pi, out_mu, out_sigma, M = 10): | |
NTEST = x_test.size | |
result = np.random.rand(NTEST, M) # initially random [0, 1] | |
rn = np.random.randn(NTEST, M) # normal random matrix (0.0, 1.0) | |
mu = 0 | |
std = 0 | |
idx = 0 | |
# transforms result into random ensembles | |
for j in range(0, M): | |
for i in range(0, NTEST): | |
idx = get_pi_idx(result[i, j], out_pi[i]) | |
mu = out_mu[i, idx] | |
std = out_sigma[i, idx] | |
result[i, j] = mu + rn[i, j]*std | |
return result | |
############################################################## | |
# 생성해내는 값을 보자 | |
############################################################## | |
with tf.Session() as sess: | |
sess.run(tf.global_variables_initializer()) | |
out_pi_test, out_sigma_test, out_mu_test = sess.run(get_mixture_coef(output), feed_dict={x: x_test}) | |
y_test = generate_ensemble(out_pi_test, out_mu_test, out_sigma_test) | |
plt.figure(figsize=(8, 8)) | |
plt.plot(x_data,y_data,'ro', x_test,y_test,'bo',alpha=0.3) | |
plt.show() | |
############################################################## | |
# 우리는 수많은 초록 중에서 원하는 하나를 선택하게 된다 | |
############################################################## | |
plt.figure(figsize=(8, 8)) | |
plt.plot(x_test,out_mu_test,'go', x_test,y_test,'bo',alpha=0.3) | |
plt.show() | |
############################################################## | |
# 마지막으로 히트맵으로 그려보기기############################################################## | |
x_heatmap_label = np.float32(np.arange(-15,15,0.1)) | |
y_heatmap_label = np.float32(np.arange(-15,15,0.1)) | |
def custom_gaussian(x, mu, std): | |
x_norm = (x-mu)/std | |
result = oneDivSqrtTwoPI*math.exp(-x_norm*x_norm/2)/std | |
return result | |
def generate_heatmap(out_pi, out_mu, out_sigma, x_heatmap_label, y_heatmap_label): | |
N = x_heatmap_label.size | |
M = y_heatmap_label.size | |
K = KMIX | |
z = np.zeros((N, M)) # initially random [0, 1] | |
mu = 0 | |
std = 0 | |
pi = 0 | |
# transforms result into random ensembles | |
for k in range(0, K): | |
for i in range(0, M): | |
pi = out_pi[i, k] | |
mu = out_mu[i, k] | |
std = out_sigma[i, k] | |
for j in range(0, N): | |
z[N-j-1, i] += pi * custom_gaussian(y_heatmap_label[j], mu, std) | |
return z | |
def draw_heatmap(xedges, yedges, heatmap): | |
extent = [xedges[0], xedges[-1], yedges[0], yedges[-1]] | |
plt.figure(figsize=(8, 8)) | |
plt.imshow(heatmap, extent=extent) | |
plt.show() | |
z = generate_heatmap(out_pi_test, out_mu_test, out_sigma_test, x_heatmap_label, y_heatmap_label) | |
draw_heatmap(x_heatmap_label, y_heatmap_label, z) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
아직 조금 더 공부해야 한다.
수식도 역으로 계산해보자