Skip to content

Instantly share code, notes, and snippets.

@alexlee-gk
Created April 19, 2018 23:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexlee-gk/1ae88125ec38efc48b542a4c0356078f to your computer and use it in GitHub Desktop.
Save alexlee-gk/1ae88125ec38efc48b542a4c0356078f to your computer and use it in GitHub Desktop.
Equivalence and timing tests for upsample_conv2d and conv_pool2d.
import numpy as np
import tensorflow as tf
from video_prediction.ops import pad2d_paddings, conv2d, deconv2d, upsample2d, upsample_conv2d, pool2d, conv_pool2d
def test_upsample_conv2d():
sess = tf.Session()
batch = 16
for strides in ([2, 2], [3, 4]):
for in_height, in_width in ([8, 8], [16, 8]):
for kernel_size in ([3, 3], [4, 4], [5, 5], [3, 6], [3, 7]):
for in_channels in (1, 3):
for filters in (1, 3):
inputs = np.random.random((batch, in_height, in_width, in_channels)).astype(np.float32)
kernel = np.random.random((kernel_size[0], kernel_size[1], in_channels, filters)).astype(np.float32)
bias = np.random.random(filters).astype(np.float32)
inputs_ph = tf.placeholder(tf.float32, shape=inputs.shape)
kernel_ph = tf.placeholder(tf.float32, shape=kernel.shape)
bias_ph = tf.placeholder(tf.float32, shape=bias.shape)
outputs = upsample_conv2d(inputs_ph, filters, kernel_size=kernel_size, strides=strides,
kernel=kernel_ph, bias=bias_ph)
# upsample with bilinear interpolation
inputs_up = upsample2d(inputs_ph, strides, padding='VALID')
# convolve upsampled input with kernel
outputs_up = conv2d(inputs_up, filters, kernel_size=kernel_size,
kernel=kernel_ph, bias=bias_ph, padding='FULL')
# crop appropriately
same_paddings = pad2d_paddings(inputs_ph, kernel_size, strides=(1, 1), padding='SAME')
full_paddings = pad2d_paddings(inputs_ph, kernel_size, strides=(1, 1), padding='FULL')
crop_top = (strides[0] - strides[0] % 2) // 2 + full_paddings[1][1] - same_paddings[1][1]
crop_left = (strides[1] - strides[1] % 2) // 2 + full_paddings[2][1] - same_paddings[2][1]
outputs_up = outputs_up[:, crop_top:crop_top + strides[0] * in_height,
crop_left:crop_left + strides[1] * in_width, :]
feed_dict = {inputs_ph: inputs, kernel_ph: kernel, bias_ph: bias}
inputs_up_result, outputs_result, outputs_up_result = sess.run([inputs_up, outputs, outputs_up], feed_dict=feed_dict)
assert np.allclose(outputs_result, outputs_up_result, atol=1.0 / 255.0)
def test_conv_pool2d():
sess = tf.Session()
batch = 16
for strides in ([2, 2], [3, 4]):
for in_height, in_width in ([12, 8], [18, 16]): # should be multiple of strides
for kernel_size in ([3, 3], [4, 4], [5, 5], [3, 6], [3, 7]):
for in_channels in (1, 3):
for filters in (1, 3):
inputs = np.random.random((batch, in_height, in_width, in_channels)).astype(np.float32)
kernel = np.random.random((kernel_size[0], kernel_size[1], in_channels, filters)).astype(np.float32)
bias = np.random.random(filters).astype(np.float32)
inputs_ph = tf.placeholder(tf.float32, shape=inputs.shape)
kernel_ph = tf.placeholder(tf.float32, shape=kernel.shape)
bias_ph = tf.placeholder(tf.float32, shape=bias.shape)
outputs = conv_pool2d(inputs_ph, filters, kernel_size=kernel_size, strides=strides,
kernel=kernel_ph, bias=bias_ph, pool_mode='avg')
inputs_conv = conv2d(inputs_ph, filters, kernel_size=kernel_size,
kernel=kernel_ph, bias=bias_ph)
outputs_pool = pool2d(inputs_conv, pool_size=strides, strides=strides, pool_mode='avg')
feed_dict = {inputs_ph: inputs, kernel_ph: kernel, bias_ph: bias}
outputs_result, outputs_pool_result = sess.run([outputs, outputs_pool], feed_dict=feed_dict)
assert np.allclose(outputs_result, outputs_pool_result, atol=1.0 / 255.0)
def test_upsample_conv2d_speed():
import time
sess = tf.Session()
batch = 16
strides = [2, 2]
in_height = 64
in_width = 64
in_channels = 128
kernel_size = [4, 4]
filters = 64
inputs = tf.get_variable('inputs_upsample_conv2d', (batch, in_height, in_width, in_channels), dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.02))
kernel = tf.get_variable('kernel_upsample_conv2d', (kernel_size[0], kernel_size[1], in_channels, filters), dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.02))
kernel_transposed = tf.get_variable('kernel_transposed_upsample_conv2d',
(kernel_size[0], kernel_size[1], filters, in_channels), dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.02))
outputs = upsample_conv2d(inputs, filters, kernel_size=kernel_size, strides=strides,
kernel=kernel, use_bias=False)
inputs_up = upsample2d(inputs, strides, padding='VALID')
outputs_up = conv2d(inputs_up, filters, kernel_size=kernel_size, kernel=kernel,
use_bias=False)[:, :strides[0] * in_height, :strides[1] * in_width, :]
outputs_stride = deconv2d(inputs, filters, kernel_size=kernel_size, strides=strides,
kernel=kernel_transposed, use_bias=False)
outputs_sums = [tf.reduce_sum(outputs), tf.reduce_sum(outputs_up), tf.reduce_sum(outputs_stride)]
train_ops = [tf.train.AdamOptimizer().minimize(outputs_sum, var_list=[kernel, kernel_transposed])
for outputs_sum in outputs_sums]
sess.run(tf.global_variables_initializer())
outputs_times = [0.0 for _ in outputs_sums]
for t in range(1000):
for i, outputs_sum in enumerate(outputs_sums):
if t > 10:
start_time = time.time()
sess.run(outputs_sum)
if t > 10:
outputs_times[i] += time.time() - start_time
print(outputs_times)
outputs_times = [0.0 for _ in train_ops]
for t in range(1000):
for i, train_op in enumerate(train_ops):
if t > 10:
start_time = time.time()
sess.run(train_op)
if t > 10:
outputs_times[i] += time.time() - start_time
print(outputs_times)
def test_conv_pool2d_speed():
import time
sess = tf.Session()
batch = 16
strides = [2, 2]
in_height = 128
in_width = 128
in_channels = 64
kernel_size = [4, 4]
filters = 128
inputs = tf.get_variable('inputs_conv_pool2d', (batch, in_height, in_width, in_channels), dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.02))
kernel = tf.get_variable('kernel_conv_pool2d', (kernel_size[0], kernel_size[1], in_channels, filters), dtype=tf.float32,
initializer=tf.truncated_normal_initializer(stddev=0.02))
outputs = conv_pool2d(inputs, filters, kernel_size=kernel_size, strides=strides,
kernel=kernel, use_bias=False)
inputs_conv = conv2d(inputs, filters, kernel_size=kernel_size, kernel=kernel,
use_bias=False, padding='SAME')
outputs_pool = pool2d(inputs_conv, pool_size=strides, strides=strides, pool_mode='avg')
outputs_stride = conv2d(inputs, filters, kernel_size=kernel_size, strides=strides,
kernel=kernel, use_bias=False)
outputs_sums = [tf.reduce_sum(outputs), tf.reduce_sum(outputs_pool), tf.reduce_sum(outputs_stride)]
train_ops = [tf.train.AdamOptimizer().minimize(outputs_sum, var_list=[kernel])
for outputs_sum in outputs_sums]
sess.run(tf.global_variables_initializer())
outputs_times = [0.0 for _ in outputs_sums]
for t in range(1000):
for i, outputs_sum in enumerate(outputs_sums):
if t > 10:
start_time = time.time()
sess.run(outputs_sum)
if t > 10:
outputs_times[i] += time.time() - start_time
print(outputs_times)
outputs_times = [0.0 for _ in train_ops]
for t in range(1000):
for i, train_op in enumerate(train_ops):
if t > 10:
start_time = time.time()
sess.run(train_op)
if t > 10:
outputs_times[i] += time.time() - start_time
print(outputs_times)
if __name__ == "__main__":
test_upsample_conv2d()
test_conv_pool2d()
test_upsample_conv2d_speed()
test_conv_pool2d_speed()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment