Skip to content

Instantly share code, notes, and snippets.

@nicewook
Created April 13, 2017 06:54
Show Gist options
  • Save nicewook/57939132fbc5833ba3f977da3cd41e8c to your computer and use it in GitHub Desktop.
Save nicewook/57939132fbc5833ba3f977da3cd41e8c to your computer and use it in GitHub Desktop.
mdn toy code
# 원문링크: http://blog.otoro.net/2015/11/24/mixture-density-networks-with-tensorflow/
##############################################################
# 라이브러리 import
##############################################################
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import math
##############################################################
# 우리가 목표로 할 그래프 생성
##############################################################
NSAMPLE = 1000
x_data = np.float32(np.random.uniform(-10.5, 10.5, (1, NSAMPLE))).T
r_data = np.float32(np.random.normal(size=(NSAMPLE,1))) # 랜덤 노이즈이다
y_data = np.float32(np.sin(0.75*x_data)*7.0+x_data*0.5+r_data*1.0)
plt.figure(figsize=(8, 8))
plot_out = plt.plot(x_data,y_data,'ro',alpha=0.3)
plt.show()
##############################################################
# x 입력에 정답 출력은 y 이다
##############################################################
x = tf.placeholder(dtype=tf.float32, shape=[None,1])
y = tf.placeholder(dtype=tf.float32, shape=[None,1])
##############################################################
# 히든 레이어 만들기
# 히든 레이어의 노드는 20개
##############################################################
NHIDDEN = 20
W = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=1.0, dtype=tf.float32))
b = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=1.0, dtype=tf.float32))
W_out = tf.Variable(tf.random_normal([NHIDDEN,1], stddev=1.0, dtype=tf.float32))
b_out = tf.Variable(tf.random_normal([1,1], stddev=1.0, dtype=tf.float32))
# [None, 1] 과 [1, NHIDDEN] 을 행렬곱했으니 출력은 [None, NHIDDEN]
hidden_layer = tf.nn.tanh(tf.matmul(x, W) + b)
# [None, NHIDDEN] 과 [NHIDDEN, 1]을 행렬곱했으니 출력은 [None, 1]
# 즉 입력되는 x의 개수만큼 출력이 된다
y_out = tf.matmul(hidden_layer,W_out) + b_out
##############################################################
# loss function 정의하기
# tf.nn.l2_loss() 참고 https://www.tensorflow.org/api_docs/python/tf/nn/l2_loss
# Computes half the L2 norm of a tensor without the sqrt: 루트는 안씌운 값임
# output = sum(t ** 2) / 2
##############################################################
lossfunc = tf.nn.l2_loss(y_out-y)
##############################################################
# weight update
##############################################################
train_op = tf.train.RMSPropOptimizer(learning_rate=0.1, decay=0.8).minimize(lossfunc)
##############################################################
# 위쪽은 그래프 만드는 것
# 여기서부터 session run
##############################################################
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
# 1000번 돌린다
NEPOCH = 1000
for i in range(NEPOCH):
sess.run(train_op, feed_dict={x: x_data, y: y_data})
# 테스트
x_test = np.float32(np.arange(-10.5, 10.5, 0.1))
print(x_test.shape)
x_test = x_test.reshape(x_test.size, 1)
y_test = sess.run(y_out, feed_dict={x: x_test})
plt.figure(figsize=(8, 8))
# 8인치, 8인치 https://www.labri.fr/perso/nrougier/teaching/matplotlib/#figures-subplots-axes-and-ticks
# 전체 화면의 크기를 말한다고 보면 된다. 뭐 중요한 건 아닌듯
plt.plot(x_data, y_data, 'ro', x_test, y_test, 'bo', alpha=0.3)
plt.show()
# 표를 보면 잘 맞는다.
# 문제는 이런 방식은 x, y가 1대1 또는 다대 1 (one to one, many to one) 일때만 잘 동작한다는 것
##############################################################
# 우리가 그린 그래프의 x, y를 뒤집어보자
##############################################################
temp_data = x_data
x_data = y_data
y_data = temp_data
plt.figure(figsize=(8, 8))
plot_out = plt.plot(x_data,y_data,'ro',alpha=0.3)
plt.show()
##############################################################
# 이 경우에는 MSE, 즉 tf.nn.l2_loss() 로 하면 답이 안나온다.
##############################################################
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
# 1000번 돌린다
NEPOCH = 1000
for i in range(NEPOCH):
sess.run(train_op, feed_dict={x: x_data, y: y_data})
# 테스트
x_test = np.float32(np.arange(-10.5, 10.5, 0.1))
print(x_test.shape)
x_test = x_test.reshape(x_test.size, 1)
y_test = sess.run(y_out, feed_dict={x: x_test})
plt.figure(figsize=(8, 8))
# 8인치, 8인치 https://www.labri.fr/perso/nrougier/teaching/matplotlib/#figures-subplots-axes-and-ticks
# 전체 화면의 크기를 말한다고 보면 된다. 뭐 중요한 건 아닌듯
plt.plot(x_data, y_data, 'ro', x_test, y_test, 'bo', alpha=0.3)
plt.show()
##############################################################
# 하나의 입력에 여러 출력개인걸 MSE로 풀어보려 하니
# 예측이 엉망이 되어버렸다
#
# 이제 MDN - Mixtured Density Networks를 보자
# 하나의 입력값에 대해 여러 출력값을 예측해준다
##############################################################
# MDN은 하나의 출력을 예측하기 보다
# 여러 출력의 확률적 분포를 예측해보자는 것이다.
# x = 1일때 y = 1인 경우가 많고, y = 3 인 경우가 적다면
# 확률적으로 1일 경우 90%, 3일 경우 10% 이런식의 예측이 가능하다는 것이다
# 로봇팔의 위치, 손글씨의 다음 위치등에서 이용할 수 있다.
##############################################################
# MDN 그래프
##############################################################
NHIDDEN = 24
STDEV = 0.5
KMIX = 24 # number of mixtures
NOUT = KMIX * 3 # pi, mu, stdev
x = tf.placeholder(dtype=tf.float32, shape=[None,1], name="x")
y = tf.placeholder(dtype=tf.float32, shape=[None,1], name="y")
Wh = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=STDEV, dtype=tf.float32))
bh = tf.Variable(tf.random_normal([1,NHIDDEN], stddev=STDEV, dtype=tf.float32))
Wo = tf.Variable(tf.random_normal([NHIDDEN,NOUT], stddev=STDEV, dtype=tf.float32))
bo = tf.Variable(tf.random_normal([1,NOUT], stddev=STDEV, dtype=tf.float32))
# [None, 1] 과 [1, NHIDDEN] 을 행렬곱했으니 출력은 [None, NHIDDEN]
hidden_layer = tf.nn.tanh(tf.matmul(x, Wh) + bh)
# [None, NHIDDEN] 과 [NHIDDEN, NOUT]을 행렬곱했으니 출력은 [None, NOUT]
# 즉 (입력되는 x의 개수, 72) 가 된다
output = tf.matmul(hidden_layer,Wo) + bo
##############################################################
# probability distribution function을 계산하기 위한 값들을
# 네크웍의 출력인 output에서 연산을 통해 뽑아낸다
##############################################################
def get_mixture_coef(output):
# 우선 output에서 24개씩 잘라내어 넣을 변수넣을 수 있도록 마련해둔다 KMIX = 24이다
out_pi = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")
out_sigma = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")
out_mu = tf.placeholder(dtype=tf.float32, shape=[None,KMIX], name="mixparam")
# 잘라내서 각각 넣어준다
# tf.split() 함수 https://www.tensorflow.org/api_docs/python/tf/split
# 아래의 3은 3등분, output은 3등분할 녀석(tf 1.0에서는 파라미터 순서가 다를 수 있다.
# out_pi, out_sigma, out_mu = tf.split(1, 3, output) # 옛날버전
out_pi, out_sigma, out_mu = tf.split(output, 3, 1 ) # 요즘버전
# https://www.tensorflow.org/api_docs/python/tf/reduce_max
# axis=1 이고 입력 out_pi (shape는 None, 24) 이다.
# keep_dims=True이므로 해당 rank가 1로 줄어든다
# 즉 24개중 가장 큰 값을 뽑아내는 것으로 보인다. 결과인 map_pi는 그러면 (None, 1) 이 될것으로 보인다
max_pi = tf.reduce_max(out_pi, 1, keep_dims=True)
out_pi = tf.subtract(out_pi, max_pi) # 뺄셈. 결국 각각의 값에 대해 최대값을 빼주는 것이다. 아래 exp 연산 폭주 예방
out_pi = tf.exp(out_pi)
# https: // www.tensorflow.org / api_docs / python / tf / reciprocal
#normalize_pi = tf.inv(tf.reduce_sum(out_pi, 1, keep_dims=True)) # 옛날 버전
normalize_pi = tf.reciprocal(tf.reduce_sum(out_pi, 1, keep_dims=True)) # 24개의 합을 낸 뒤 분수로 만든다 y = 1/x
out_pi = tf.multiply(normalize_pi, out_pi) # 각각의 값 / 전체값 = 각각의 확률값
out_sigma = tf.exp(out_sigma) # 시그마 = 편차에 exp 먹임
return out_pi, out_sigma, out_mu # mu는 그냥 내보낸다
out_pi, out_sigma, out_mu = get_mixture_coef(output)
##############################################################
# 아까처럼 x, y가 바뀐 녀석으로 train set을 만들어보자
# 샘플을 좀 더 많이 넣음 = 2500개
##############################################################
NSAMPLE = 2500
y_data = np.float32(np.random.uniform(-10.5, 10.5, (1, NSAMPLE))).T
r_data = np.float32(np.random.normal(size=(NSAMPLE,1))) # random noise
x_data = np.float32(np.sin(0.75*y_data)*7.0+y_data*0.5+r_data*1.0)
plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', alpha=0.3)
plt.show()
##############################################################
# 이번에는 l2 loss function을 쓸 수 없다.
# logarithm of the likelihood 를 쓸 것이다.
##############################################################
oneDivSqrtTwoPI = 1 / math.sqrt(2*math.pi) # normalisation factor for gaussian, not needed. 루트 2파이분에 1
def tf_normal(y, mu, sigma):
result = tf.subtract(y, mu)
result = tf.multiply(result, tf.reciprocal(sigma))
result = -tf.square(result)/2
return tf.multiply(tf.exp(result), tf.reciprocal(sigma)) * oneDivSqrtTwoPI
def get_lossfunc(out_pi, out_sigma, out_mu, y):
result = tf_normal(y, out_mu, out_sigma) # 1) 여기서 위에 함수를 부른다
result = tf.multiply(result, out_pi)
result = tf.reduce_sum(result, 1, keep_dims=True)
result = -tf.log(result)
return tf.reduce_mean(result)
lossfunc = get_lossfunc(out_pi, out_sigma, out_mu, y)
train_op = tf.train.AdamOptimizer().minimize(lossfunc)
##############################################################
# 이제 MDN을 실제로 돌려보자
##############################################################
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
NEPOCH = 10000
loss = np.zeros(NEPOCH) # store the training progress here.
for i in range(NEPOCH):
sess.run(train_op,feed_dict={x: x_data, y: y_data})
loss[i] = sess.run(lossfunc, feed_dict={x: x_data, y: y_data})
##############################################################
# loss를 그려보자
##############################################################
plt.figure(figsize=(8, 8))
plt.plot(np.arange(100, NEPOCH,1), loss[100:], 'r-')
plt.show()
##############################################################
# 이제 실제 뽑아내는 부분
##############################################################
x_test = np.float32(np.arange(-15,15,0.1))
NTEST = x_test.size
x_test = x_test.reshape(NTEST,1) # needs to be a matrix, not a vector
def get_pi_idx(x, pdf):
N = pdf.size
accumulate = 0
for i in range(0, N):
accumulate += pdf[i]
if accumulate >= x:
return i
print ('error with sampling ensemble')
return -1
def generate_ensemble(out_pi, out_mu, out_sigma, M = 10):
NTEST = x_test.size
result = np.random.rand(NTEST, M) # initially random [0, 1]
rn = np.random.randn(NTEST, M) # normal random matrix (0.0, 1.0)
mu = 0
std = 0
idx = 0
# transforms result into random ensembles
for j in range(0, M):
for i in range(0, NTEST):
idx = get_pi_idx(result[i, j], out_pi[i])
mu = out_mu[i, idx]
std = out_sigma[i, idx]
result[i, j] = mu + rn[i, j]*std
return result
##############################################################
# 생성해내는 값을 보자
##############################################################
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
out_pi_test, out_sigma_test, out_mu_test = sess.run(get_mixture_coef(output), feed_dict={x: x_test})
y_test = generate_ensemble(out_pi_test, out_mu_test, out_sigma_test)
plt.figure(figsize=(8, 8))
plt.plot(x_data,y_data,'ro', x_test,y_test,'bo',alpha=0.3)
plt.show()
##############################################################
# 우리는 수많은 초록 중에서 원하는 하나를 선택하게 된다
##############################################################
plt.figure(figsize=(8, 8))
plt.plot(x_test,out_mu_test,'go', x_test,y_test,'bo',alpha=0.3)
plt.show()
##############################################################
# 마지막으로 히트맵으로 그려보기기##############################################################
x_heatmap_label = np.float32(np.arange(-15,15,0.1))
y_heatmap_label = np.float32(np.arange(-15,15,0.1))
def custom_gaussian(x, mu, std):
x_norm = (x-mu)/std
result = oneDivSqrtTwoPI*math.exp(-x_norm*x_norm/2)/std
return result
def generate_heatmap(out_pi, out_mu, out_sigma, x_heatmap_label, y_heatmap_label):
N = x_heatmap_label.size
M = y_heatmap_label.size
K = KMIX
z = np.zeros((N, M)) # initially random [0, 1]
mu = 0
std = 0
pi = 0
# transforms result into random ensembles
for k in range(0, K):
for i in range(0, M):
pi = out_pi[i, k]
mu = out_mu[i, k]
std = out_sigma[i, k]
for j in range(0, N):
z[N-j-1, i] += pi * custom_gaussian(y_heatmap_label[j], mu, std)
return z
def draw_heatmap(xedges, yedges, heatmap):
extent = [xedges[0], xedges[-1], yedges[0], yedges[-1]]
plt.figure(figsize=(8, 8))
plt.imshow(heatmap, extent=extent)
plt.show()
z = generate_heatmap(out_pi_test, out_mu_test, out_sigma_test, x_heatmap_label, y_heatmap_label)
draw_heatmap(x_heatmap_label, y_heatmap_label, z)
@nicewook
Copy link
Author

아직 조금 더 공부해야 한다.
수식도 역으로 계산해보자

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment