Created June 1, 2017 15:57
from keras import backend as K
from keras.layers import Layer
class MaxPoolingMask2D(Layer):
def __init__(self, pool_size=(2, 2), strides=(2, 2), padding='valid', **kwargs):
super(MaxPoolingMask2D, self).__init__(**kwargs)
self.pool_size = pool_size
self.strides = strides
self.padding = padding
def build(self, input_shape):
# Create a trainable weight variable for this layer.
super(MaxPoolingMask2D, self).build(input_shape) # Be sure to call this somewhere!
def call(self, inputs, **kwargs):
pool_size = self.pool_size
padding = self.padding
strides = self.strides
if K.backend() == 'tensorflow':
ksize = [1, pool_size[0], pool_size[1], 1]
padding = padding.upper()
strides = [1, strides[0], strides[1], 1]
output, argmax =, ksize=ksize, strides=strides, padding=padding)
raise NotImplementedError('{} backend is not supported for layer {}'.format(K.backend(), type(self).__name__))
return [output, argmax]
def compute_output_shape(self, input_shape):
ratio = (1, 2, 2, 1)
output_shape = [dim / ratio[idx] if dim is not None else None for idx, dim in enumerate(input_shape)]
output_shape = tuple(output_shape)
return [output_shape, output_shape]
def unpool(updates, mask, ksize=(2, 2), output_shape=None, name=''):
tf =
mask = tf.cast(mask, tf.int32)
input_shape = tf.shape(updates, out_type=tf.int32)
# calculation new shape
if output_shape is None:
output_shape = (input_shape[0], input_shape[1] * ksize[0], input_shape[2] * ksize[1], input_shape[3])
# calculation indices for batch, height, width and feature maps
one_like_mask = tf.ones_like(mask, dtype=tf.int32)
batch_shape = tf.concat([[input_shape[0]], [1], [1], [1]], 0)
batch_range = tf.reshape(tf.range(output_shape[0], dtype=tf.int32), shape=batch_shape)
b = one_like_mask * batch_range
y = mask // (output_shape[2] * output_shape[3])
x = (mask // output_shape[3]) % output_shape[2] #mask % (output_shape[2] * output_shape[3]) // output_shape[3]
feature_range = tf.range(output_shape[3], dtype=tf.int32)
f = one_like_mask * feature_range
# transpose indices & reshape update values to one dimension
updates_size = tf.size(updates)
indices = tf.transpose(tf.reshape(tf.stack([b, y, x, f]), [4, updates_size]))
values = tf.reshape(updates, [updates_size])
ret = tf.scatter_nd(indices, values, output_shape)
return ret
# This is a dummy layer
class DummyLayer(Layer):
def __init__(self, **kwargs):
super(DummyLayer, self).__init__(**kwargs)
def build(self, input_shape):
super(DummyLayer, self).build(input_shape)
def call(self, inputs, **kwargs):
return inputs
def compute_output_shape(self, input_shape):
return input_shape
from keras import backend as K
import numpy as np
############## Define input array ##############
image = np.array([[[[0.5931], [0.8745], [0.9435], [0.0106]],
[[0.3912], [0.2010], [0.3417], [0.9112]],
[[0.7642], [0.5194], [0.2918], [0.6244]],
[[0.7514], [0.8144], [0.0264], [0.8057]]]])
image -= np.min(image)
image /= np.max(image)
image *= 255
image = image.astype(np.uint8)
# [[159 236 255 0]
# [104 52 90 246]
# [205 139 76 167]
# [202 219 4 217]]
image_tensor = K.variable(image) # not a real image but let's assume it is
########## Run maxpooling with argmax ##########
updates, indices = MaxPoolingMask2D()(image_tensor)
############## Check output shape ##############
print(K.eval(updates)[0, :, :, 0])
# [[ 236. 255.]
# [ 219. 217.]]
print(K.eval(indices)[0, :, :, 0])
# [[ 1 2]
# [13 15]]
########## attempt unpool (function) ##########
unpooled = K.eval(unpool(updates=updates, mask=indices))
print(unpooled[0, :, :, 0])
# [[ 0. 236. 255. 0.]
# [ 0. 0. 0. 0.]
# [ 0. 0. 0. 0.]
# [ 0. 219. 0. 217.]]
############ attempt unpool (class) ############
unpool_layer = DummyLayer()
unpooled = unpool_layer(indices) # replace with updates and it runs fine
unpooled = K.eval(unpooled)
IndexError Traceback (most recent call last)
<ipython-input-45-0af32df50033> in <module>()
14 unpool_layer = Unpooling2D()
---> 15 unpooled = unpool_layer(indices)
16 unpooled = K.eval(unpooled)
{KERAS_DIR}/keras/engine/topology.pyc in __call__(self, inputs, **kwargs)
571 # Handle mask propagation.
--> 572 previous_mask = _collect_previous_mask(inputs)
573 user_kwargs = copy.copy(kwargs)
574 if not _is_all_none(previous_mask):
{KERAS_DIR}/keras/engine/topology.pyc in _collect_previous_mask(input_tensors)
2701 inbound_layer, node_index, tensor_index = x._keras_history
2702 node = inbound_layer.inbound_nodes[node_index]
-> 2703 mask = node.output_masks[tensor_index]
2704 masks.append(mask)
2705 else:
IndexError: list index out of range
If I use the unpool function in an actual network, it crashes at the first convolution after unpooling because:

The channel dimension of the inputs should be defined. Found `None`.

Apparently keras somehow gets confused about the shape and thinks that _keras_shape is (None, None, None, None) at this line. Whereas it's actually (None, None, None, 64) right before during debugging.

But that's a different issue. Is there a workaround for the first problem (using proper layer classes for both the custom maxpool and the unpooling)?

