convnet.py
- convnet160327.py
- convnet160323.py
- convnet150907.py
- older version is found in https://gist.github.com/takatakamanbou/b65f0048160f0ed07985
import numpy as np | |
import theano | |
import theano.tensor as T | |
import theano.tensor.signal.downsample as Tsd | |
import nnet150903 as nnet | |
########## Input Layer for 4D tensor inputs ########## | |
class T4InputLayer( object ): | |
def __init__( self, Xdim, dropout = 1.0 ): | |
self.Xshape = Xdim # Xnch, Xrow, Xcol | |
self.dropout = dropout | |
def Top_output( self, X ): | |
if self.dropout < 1.0: | |
return X * self.dropout | |
else: | |
return X | |
def Top_generateMask( self, rng ): | |
return rng.uniform( self.Xshape ) <= self.dropout | |
def Top_outputMasked( self, X, mask ): | |
return X * mask | |
########## Convolution Layer ########## | |
class ConvLayer( object ): | |
def __init__( self, Xdim, Wdim, afunc, withBias = True, Wini = 0.01, | |
border_mode = 'valid', dropout = 1.0 ): | |
# dimension of the input | |
Xnch, Xrow, Xcol = Xdim | |
self.Xshape = Xdim | |
# dimension of the convolution filters | |
Wnch, Wrow, Wcol = Wdim | |
self.Wshape = ( Wnch, Xnch, Wrow, Wcol ) | |
# dimension of the output | |
if border_mode == 'valid': | |
Yrow, Ycol = Xrow - Wrow + 1, Xcol - Wcol + 1 | |
else: | |
Yrow, Ycol = Xrow + Wrow - 1, Xcol + Wcol - 1 | |
self.Yshape = ( Wnch, Yrow, Ycol ) | |
self.Dout = Wnch * Yrow * Ycol | |
# activation function of the layer | |
self.afunc = afunc | |
self.withBias = withBias | |
# border mode | |
assert border_mode in [ 'valid', 'full' ] | |
self.border_mode = border_mode | |
# dropout | |
self.dropout = dropout | |
# theano shared variables | |
floatX = theano.config.floatX | |
W = Wini * np.random.standard_normal( self.Wshape ) | |
self.W = theano.shared( np.asarray( W, dtype = floatX ) ) | |
self.dW = theano.shared( np.zeros( self.Wshape, dtype = floatX ) ) | |
if withBias: | |
self.b = theano.shared( np.zeros( Wnch, dtype = floatX ) ) | |
self.db = theano.shared( np.zeros( Wnch, dtype = floatX ) ) | |
''' | |
def Tfunc_setWeight( self ): | |
# under construction | |
''' | |
def getWeight( self ): | |
W = self.W.get_value() | |
if self.withBias: | |
b = self.b.get_value() | |
return [ W, b ] | |
else: | |
return W | |
def Top_outputRaw( self, X ): | |
# X: Ndat x Xshape, Y: Ndat x Yshape | |
Xs = ( None, self.Xshape[0], self.Xshape[1], self.Xshape[2] ) | |
Ws = self.Wshape | |
Y = T.nnet.conv.conv2d( X, self.W, image_shape = Xs, filter_shape = Ws, border_mode = self.border_mode ) | |
if self.withBias: | |
b = self.b.dimshuffle( 'x', 0, 'x', 'x' ) # 1 x nch x 1 x 1 | |
Y += b | |
Z = nnet.d_afunc[self.afunc]( Y ) | |
return Y, Z # Ndat x Yshape | |
def Top_output( self, X ): | |
Y, Z = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
Z *= self.dropout | |
return Y, Z | |
def Top_generateMask( self, rng ): | |
return rng.uniform( ( self.Yshape ) ) <= self.dropout | |
def Top_outputMasked( self, X, mask ): | |
Y, Z = self.Top_outputRaw( X ) | |
return Y, Z * mask | |
def T_update( self, cost, eta, mu, lam ): | |
gradW = T.grad( cost, self.W ) | |
dWnew = -eta * ( gradW + lam * self.W ) + mu * self.dW | |
Wnew = self.W + dWnew | |
upList = [ ( self.W, Wnew ), ( self.dW, dWnew ) ] | |
if self.withBias: | |
gradb = T.grad( cost, self.b ) | |
# no weight decay for bias | |
dbnew = -eta * gradb + mu * self.db | |
bnew = self.b + dbnew | |
upList += [ ( self.b, bnew ), ( self.db, dbnew ) ] | |
return upList | |
########## Pooling Layer ########## | |
class PoolLayer( object ): | |
def __init__( self, Xdim, ds, st = None, dropout = 1.0 ): | |
# dimension of the input | |
Xnch, Xrow, Xcol = Xdim | |
self.Xshape = Xdim | |
# parameters of the pooling layer | |
self.ds = ds | |
self.st = st | |
self.ignore_border = False | |
rv = Tsd.DownsampleFactorMax.out_shape( self.Xshape, ds, ignore_border = self.ignore_border, st = st ) | |
#self.Yshape = ( Xnch, rv[1], rv[2] ) | |
self.Yshape = tuple( rv ) | |
self.Dout = np.prod( self.Yshape ) | |
# dropout | |
self.dropout = dropout | |
def Top_outputRaw( self, X ): | |
# X: Ndat x Xshape | |
Y = Tsd.max_pool_2d( X, self.ds, ignore_border = self.ignore_border, st = self.st ) # Ndat x Yshape | |
return Y, Y | |
def Top_output( self, X ): | |
Y, Y = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
Y *= self.dropout | |
return Y, Y | |
def Top_generateMask( self, rng ): | |
return rng.uniform( ( self.Yshape ) ) <= self.dropout | |
def Top_outputMasked( self, X, mask ): | |
Y, Y = self.Top_outputRaw( X ) | |
Ymask = Y * mask | |
return Ymask, Ymask | |
########## Full-Connection Layer ########## | |
class FullLayer( nnet.Layer ): | |
def __init__( self, Din, Nunit, afunc, withBias = True, Wini = 0.01, | |
dropout = 1.0, T4toMat = False ): | |
super( FullLayer, self ).__init__( Din, Nunit, afunc, withBias, Wini, dropout ) | |
self.T4toMat = T4toMat | |
def super_Top_outputRaw( self, X ): | |
return super( FullLayer, self ).Top_outputRaw( X ) | |
def Top_outputRaw( self, X ): | |
if self.T4toMat: | |
return self.super_Top_outputRaw( X.reshape( ( X.shape[0], -1 ) ) ) | |
else: | |
return self.super_Top_outputRaw( X ) | |
########## Convolutional Neural Net ########## | |
class CNN( object ): | |
def __init__( self, Layers, rng = None ): | |
# layers - list of Layer instances | |
self.Layers = Layers | |
assert isinstance( Layers[0], T4InputLayer ) | |
# random number generator | |
if rng == None: | |
self.rng = nnet.randomstreams( 0 ) | |
else: | |
self.rng = rng | |
# theano functions | |
self.output = self.Tfunc_output() | |
self.cost = self.Tfunc_cost() | |
self.train = self.Tfunc_train() | |
# theano op for output computation ( for test ) | |
def Top_output( self, X ): | |
# input layer | |
layer = self.Layers[0] | |
Zprev = layer.Top_output( X ) | |
# hidden layers | |
for layer in self.Layers[1:]: | |
Y, Z = layer.Top_output( Zprev ) | |
Zprev = Z | |
# output | |
Zsoftmax = T.nnet.softmax( Zprev ) | |
return Zsoftmax | |
# theano function for output computation ( for test ) | |
def Tfunc_output( self ): | |
X = T.tensor4() # Ndat x Xnch x Xrow x Xcol | |
Z = self.Top_output( X ) | |
return theano.function( [ X ] , Z ) | |
# theano op for cost computation ( error term ) | |
def Top_cost( self, Z, lab ): | |
cost = T.nnet.categorical_crossentropy( Z, lab ) | |
return T.mean( cost ) | |
# theano function for cost computation | |
def Tfunc_cost( self ): | |
Z = T.matrix() # N x K | |
lab = T.ivector() # N-dim | |
return theano.function( [ Z, lab ], self.Top_cost( Z, lab ) ) | |
# theano function for gradient descent learning | |
def Tfunc_train( self ): | |
X = T.tensor4( 'X' ) | |
lab = T.ivector( 'lab' ) | |
eta = T.scalar( 'eta' ) | |
mu = T.scalar( 'mu' ) | |
lam = T.scalar( 'lambda' ) | |
# input layer | |
layer = self.Layers[0] | |
if layer.dropout < 1.0: | |
mask = layer.Top_generateMask( self.rng ) | |
Zprev = layer.Top_outputMasked( X, mask ) | |
else: | |
Zprev = layer.Top_output( X ) | |
# hidden layers | |
for layer in self.Layers[1:]: | |
if layer.dropout < 1.0: | |
mask = layer.Top_generateMask( self.rng ) | |
Y, Z = layer.Top_outputMasked( Zprev, mask ) | |
else: | |
Y, Z = layer.Top_output( Zprev ) | |
Zprev = Z | |
# output & cost | |
Z = T.nnet.softmax( Zprev ) | |
cost = self.Top_cost( Z, lab ) | |
# updatesList | |
updatesList = [] | |
for layer in self.Layers: | |
if isinstance( layer, T4InputLayer ) or isinstance( layer, PoolLayer ): | |
continue | |
updatesList += layer.T_update( cost, eta, mu, lam ) | |
return theano.function( [ X, lab, eta, mu, lam ], [ Z, cost ], updates = updatesList ) |
import numpy as np | |
import theano | |
import theano.tensor as T | |
import theano.tensor.signal.pool as Tsp | |
import nnet151219 as nnet | |
########## Input Layer for 4D tensor inputs ########## | |
class T4InputLayer( object ): | |
def __init__( self, Xdim, rng = None, dropout = 1.0 ): | |
self.Xshape = Xdim # Xnch, Xrow, Xcol | |
self.dropout = dropout | |
if rng == None: | |
self.rng = nnet.randomstreams( 0 ) | |
else: | |
self.rng = rng | |
def Top_outputTrain( self, X ): | |
if self.dropout < 1.0: | |
mask = self.rng.uniform( self.Xshape ) <= self.dropout | |
return X * mask | |
else: | |
return X | |
def Top_outputInference( self, X ): | |
if self.dropout < 1.0: | |
return X * self.dropout | |
else: | |
return X | |
########## Convolution Layer ########## | |
class ConvLayer( object ): | |
def __init__( self, Xdim, Wdim, afunc, rng = None, withBias = True, | |
Wini = 0.01, border_mode = 'valid', dropout = 1.0 ): | |
# dimension of the input | |
Xnch, Xrow, Xcol = Xdim | |
self.Xshape = Xdim | |
# dimension of the convolution filters | |
Wnch, Wrow, Wcol = Wdim | |
self.Wshape = ( Wnch, Xnch, Wrow, Wcol ) | |
# dimension of the output | |
if border_mode == 'valid': | |
Yrow, Ycol = Xrow - Wrow + 1, Xcol - Wcol + 1 | |
else: | |
Yrow, Ycol = Xrow + Wrow - 1, Xcol + Wcol - 1 | |
self.Yshape = ( Wnch, Yrow, Ycol ) | |
self.Dout = Wnch * Yrow * Ycol | |
# activation function of the layer | |
self.afunc = afunc | |
self.withBias = withBias | |
# border mode | |
assert border_mode in [ 'valid', 'full' ] | |
self.border_mode = border_mode | |
# dropout | |
self.dropout = dropout | |
# random number generator | |
if rng == None: | |
self.rng = nnet.randomstreams( 0 ) | |
else: | |
self.rng = rng | |
# theano shared variables | |
floatX = theano.config.floatX | |
W = Wini * np.random.standard_normal( self.Wshape ) | |
#self.W = theano.shared( np.zeros( self.Wshape, dtype = floatX ) ) | |
self.W = theano.shared( np.ones( self.Wshape, dtype = floatX ) ) | |
self.dW = theano.shared( np.zeros( self.Wshape, dtype = floatX ) ) | |
if withBias: | |
self.b = theano.shared( np.zeros( Wnch, dtype = floatX ) ) | |
self.db = theano.shared( np.zeros( Wnch, dtype = floatX ) ) | |
# theano functions | |
self.initWeight = self.Tfunc_initWeight() | |
self.setWeight = self.Tfunc_setWeight() | |
# weight initialization | |
self.initWeight( Wini ) | |
def Tfunc_initWeight( self ): | |
Wini = T.scalar() | |
#W = self.rng.normal( self.Wshape, avg = 0.0, std = Wini ) | |
''' Using the next line & 'W = np.ones' in __init__ to prevent the error raised by the above.''' | |
W = self.W * self.rng.normal( self.Wshape, avg = 0.0, std = Wini ) | |
inList = [ Wini ] | |
upList = [ ( self.W, W ) ] | |
return theano.function( inList, None, updates = upList ) | |
def Tfunc_setWeight( self ): | |
W = T.tensor4() | |
if self.withBias: | |
b = T.vector() | |
inList = [ W, b ] | |
upList = [ ( self.W, W ), ( self.b, b ) ] | |
else: | |
inList = [ W ] | |
upList = [ ( self.W, W ) ] | |
return theano.function( inList, None, updates = upList ) | |
def getWeight( self ): | |
W = self.W.get_value() | |
if self.withBias: | |
b = self.b.get_value() | |
return [ W, b ] | |
else: | |
return W | |
def Top_outputRaw( self, X ): | |
# X: Ndat x Xshape, Y: Ndat x Yshape | |
Xs = ( None, self.Xshape[0], self.Xshape[1], self.Xshape[2] ) | |
Ws = self.Wshape | |
Y = T.nnet.conv2d( X, self.W, input_shape = Xs, filter_shape = Ws, border_mode = self.border_mode ) | |
if self.withBias: | |
b = self.b.dimshuffle( 'x', 0, 'x', 'x' ) # 1 x nch x 1 x 1 | |
Y += b | |
Z = nnet.d_afunc[self.afunc]( Y ) | |
return Y, Z # Ndat x Yshape | |
def Top_outputTrain( self, X ): | |
Y, Z = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
mask = self.rng.uniform( self.Yshape ) <= self.dropout | |
return Y, Z * mask | |
else: | |
return Y, Z | |
def Top_outputInference( self, X ): | |
Y, Z = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
return Y, Z * self.dropout | |
else: | |
return Y, Z | |
def T_update( self, cost, eta, mu, lam ): | |
gradW = T.grad( cost, self.W ) | |
dWnew = -eta * ( gradW + lam * self.W ) + mu * self.dW | |
Wnew = self.W + dWnew | |
upList = [ ( self.W, Wnew ), ( self.dW, dWnew ) ] | |
if self.withBias: | |
gradb = T.grad( cost, self.b ) | |
# no weight decay for bias | |
dbnew = -eta * gradb + mu * self.db | |
bnew = self.b + dbnew | |
upList += [ ( self.b, bnew ), ( self.db, dbnew ) ] | |
return upList | |
########## Pooling Layer ########## | |
class PoolLayer( object ): | |
def __init__( self, Xdim, ds, st = None, rng = None, dropout = 1.0 ): | |
# dimension of the input | |
Xnch, Xrow, Xcol = Xdim | |
self.Xshape = Xdim | |
# parameters of the pooling layer | |
self.ds = ds | |
self.st = st | |
#self.ignore_border = False | |
self.ignore_border = True | |
rv = Tsp.Pool.out_shape( self.Xshape, ds, ignore_border = self.ignore_border, st = st ) | |
self.Yshape = tuple( rv ) | |
self.Dout = np.prod( self.Yshape ) | |
# dropout | |
self.dropout = dropout | |
# random number generator | |
if rng == None: | |
self.rng = nnet.randomstreams( 0 ) | |
else: | |
self.rng = rng | |
def Top_outputRaw( self, X ): | |
# X: Ndat x Xshape | |
Y = Tsp.pool_2d( X, self.ds, ignore_border = self.ignore_border, st = self.st ) # Ndat x Yshape | |
return Y, Y | |
def Top_outputTrain( self, X ): | |
Y, Y = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
mask = self.rng.uniform( self.Yshape ) <= self.dropout | |
Ymask = Y * mask | |
return Ymask, Ymask | |
else: | |
return Y, Y | |
def Top_outputInference( self, X ): | |
Y, Y = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
Ymask = Y * self.dropout | |
return Ymask, Ymask | |
else: | |
return Y, Y | |
########## Full-Connection Layer ########## | |
class FullLayer( nnet.Layer ): | |
def __init__( self, Din, Nunit, afunc, rng = None, withBias = True, | |
Wini = 0.01, dropout = 1.0, T4toMat = False ): | |
super( FullLayer, self ).__init__( Din, Nunit, afunc, rng = rng, withBias = withBias, Wini = Wini, dropout = dropout ) | |
self.T4toMat = T4toMat | |
def super_Top_outputRaw( self, X ): | |
return super( FullLayer, self ).Top_outputRaw( X ) | |
def Top_outputRaw( self, X ): | |
if self.T4toMat: | |
return self.super_Top_outputRaw( X.reshape( ( X.shape[0], -1 ) ) ) | |
else: | |
return self.super_Top_outputRaw( X ) | |
########## Convolutional Neural Net ########## | |
class CNN( object ): | |
def __init__( self, Layers ): | |
# layers - list of Layer instances | |
self.Layers = Layers | |
assert isinstance( Layers[0], T4InputLayer ) | |
# theano functions | |
self.output = self.Tfunc_output() | |
self.cost = self.Tfunc_cost() | |
self.train = self.Tfunc_train() | |
# theano op for output computation ( for inference ) | |
def Top_output( self, X ): | |
# input layer | |
layer = self.Layers[0] | |
Zprev = layer.Top_outputInference( X ) | |
# hidden layers | |
for layer in self.Layers[1:]: | |
Y, Z = layer.Top_outputInference( Zprev ) | |
Zprev = Z | |
# output | |
Zsoftmax = T.nnet.softmax( Zprev ) | |
return Zsoftmax | |
# theano function for output computation ( for inference ) | |
def Tfunc_output( self ): | |
X = T.tensor4() # Ndat x Xnch x Xrow x Xcol | |
Z = self.Top_output( X ) | |
return theano.function( [ X ] , Z ) | |
# theano op for cost computation ( error term ) | |
def Top_cost( self, Z, lab ): | |
cost = T.nnet.categorical_crossentropy( Z, lab ) | |
return T.mean( cost ) | |
# theano function for cost computation | |
def Tfunc_cost( self ): | |
Z = T.matrix() # N x K | |
lab = T.ivector() # N-dim | |
return theano.function( [ Z, lab ], self.Top_cost( Z, lab ) ) | |
# theano function for gradient descent learning | |
def Tfunc_train( self ): | |
X = T.tensor4( 'X' ) | |
lab = T.ivector( 'lab' ) | |
eta = T.scalar( 'eta' ) | |
mu = T.scalar( 'mu' ) | |
lam = T.scalar( 'lambda' ) | |
# input layer | |
layer = self.Layers[0] | |
Zprev = layer.Top_outputTrain( X ) | |
# hidden layers | |
for layer in self.Layers[1:]: | |
Y, Z = layer.Top_outputTrain( Zprev ) | |
Zprev = Z | |
# output & cost | |
Z = T.nnet.softmax( Zprev ) | |
cost = self.Top_cost( Z, lab ) | |
# updatesList | |
updatesList = [] | |
for layer in self.Layers: | |
if isinstance( layer, T4InputLayer ) or isinstance( layer, PoolLayer ): | |
continue | |
updatesList += layer.T_update( cost, eta, mu, lam ) | |
return theano.function( [ X, lab, eta, mu, lam ], [ Z, cost ], updates = updatesList ) |
from __future__ import print_function | |
import numpy as np | |
import theano | |
import theano.tensor as T | |
import theano.tensor.signal.pool as Tsp | |
import nnet151219 as nnet | |
########## Input Layer for 4D tensor inputs ########## | |
class T4InputLayer( object ): | |
def __init__( self, Xdim, rng = None, dropout = 1.0 ): | |
self.Xshape = Xdim # Xnch, Xrow, Xcol | |
self.dropout = dropout | |
if rng == None: | |
self.rng = nnet.randomstreams( 0 ) | |
else: | |
self.rng = rng | |
def Top_outputTrain( self, X ): | |
if self.dropout < 1.0: | |
mask = self.rng.uniform( self.Xshape ) <= self.dropout | |
return X * mask | |
else: | |
return X | |
def Top_outputInference( self, X ): | |
if self.dropout < 1.0: | |
return X * self.dropout | |
else: | |
return X | |
########## Convolution Layer ########## | |
class ConvLayer( object ): | |
def __init__( self, Xdim, Wdim, afunc, withBias = True, Wini = 0.01, | |
rng = None, dropout = 1.0, **kwargs4conv2d ): | |
# dimension of the input | |
Xnch, Xrow, Xcol = Xdim | |
self.Xshape = Xdim | |
# dimension of the convolution filters | |
Wnch, Wrow, Wcol = Wdim | |
self.Wshape = ( Wnch, Xnch, Wrow, Wcol ) | |
# activation function & bias | |
self.afunc = afunc | |
self.withBias = withBias | |
# random number generator | |
if rng == None: | |
self.rng = nnet.randomstreams( 0 ) | |
else: | |
self.rng = rng | |
# dropout | |
self.dropout = dropout | |
# keyword arguments for conv2d | |
kwargs4conv2d['input_shape'] = ( None, Xnch, Xrow, Xcol ) | |
kwargs4conv2d['filter_shape'] = self.Wshape | |
if not 'border_mode' in kwargs4conv2d: | |
kwargs4conv2d['border_mode'] = 'valid' | |
if not 'subsample' in kwargs4conv2d: | |
kwargs4conv2d['subsample'] = ( 1, 1 ) | |
if not 'filter_flip' in kwargs4conv2d: | |
kwargs4conv2d['filter_flip'] = True | |
self.kwargs4conv2d = kwargs4conv2d | |
# dimension of the output | |
args = ( self.kwargs4conv2d['input_shape'], | |
self.kwargs4conv2d['filter_shape'], | |
self.kwargs4conv2d['border_mode'], | |
self.kwargs4conv2d['subsample'] ) | |
rv = T.nnet.abstract_conv.get_conv_output_shape( *args ) | |
self.Yshape = rv[1:] # Wnch, Yrow, Ycol | |
self.Dout = np.prod( self.Yshape ) | |
# theano shared variables | |
floatX = theano.config.floatX | |
W = Wini * np.random.standard_normal( self.Wshape ) | |
#self.W = theano.shared( np.zeros( self.Wshape, dtype = floatX ) ) | |
self.W = theano.shared( np.ones( self.Wshape, dtype = floatX ) ) | |
self.dW = theano.shared( np.zeros( self.Wshape, dtype = floatX ) ) | |
if withBias: | |
self.b = theano.shared( np.zeros( Wnch, dtype = floatX ) ) | |
self.db = theano.shared( np.zeros( Wnch, dtype = floatX ) ) | |
# theano functions | |
self.initWeight = self.Tfunc_initWeight() | |
self.setWeight = self.Tfunc_setWeight() | |
# weight initialization | |
self.initWeight( Wini ) | |
def Tfunc_initWeight( self ): | |
Wini = T.scalar() | |
#W = self.rng.normal( self.Wshape, avg = 0.0, std = Wini ) | |
''' Using the next line & 'W = np.ones' in __init__ to prevent the error raised by the above.''' | |
W = self.W * self.rng.normal( self.Wshape, avg = 0.0, std = Wini ) | |
inList = [ Wini ] | |
upList = [ ( self.W, W ) ] | |
return theano.function( inList, None, updates = upList ) | |
def Tfunc_setWeight( self ): | |
W = T.tensor4() | |
if self.withBias: | |
b = T.vector() | |
inList = [ W, b ] | |
upList = [ ( self.W, W ), ( self.b, b ) ] | |
else: | |
inList = [ W ] | |
upList = [ ( self.W, W ) ] | |
return theano.function( inList, None, updates = upList ) | |
def getWeight( self ): | |
W = self.W.get_value() | |
if self.withBias: | |
b = self.b.get_value() | |
return [ W, b ] | |
else: | |
return W | |
def Top_outputRaw( self, X ): | |
Y = T.nnet.conv2d( X, self.W, **self.kwargs4conv2d ) | |
if self.withBias: | |
b = self.b.dimshuffle( 'x', 0, 'x', 'x' ) # 1 x nch x 1 x 1 | |
Y += b | |
Z = nnet.d_afunc[self.afunc]( Y ) | |
return Y, Z # Ndat x Yshape | |
def Top_outputTrain( self, X ): | |
Y, Z = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
mask = self.rng.uniform( self.Yshape ) <= self.dropout | |
return Y, Z * mask | |
else: | |
return Y, Z | |
def Top_outputInference( self, X ): | |
Y, Z = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
return Y, Z * self.dropout | |
else: | |
return Y, Z | |
def T_update( self, cost, eta, mu, lam ): | |
gradW = T.grad( cost, self.W ) | |
dWnew = -eta * ( gradW + lam * self.W ) + mu * self.dW | |
Wnew = self.W + dWnew | |
upList = [ ( self.W, Wnew ), ( self.dW, dWnew ) ] | |
if self.withBias: | |
gradb = T.grad( cost, self.b ) | |
# no weight decay for bias | |
dbnew = -eta * gradb + mu * self.db | |
bnew = self.b + dbnew | |
upList += [ ( self.b, bnew ), ( self.db, dbnew ) ] | |
return upList | |
########## Pooling Layer ########## | |
class PoolLayer( object ): | |
def __init__( self, Xdim, ds, rng = None, dropout = 1.0, **kwargs4pool_2d ): | |
# dimension of the input | |
Xnch, Xrow, Xcol = Xdim | |
self.Xshape = Xdim | |
# downscaling factor | |
self.ds = ds | |
# dropout | |
self.dropout = dropout | |
# random number generator | |
if rng == None: | |
self.rng = nnet.randomstreams( 0 ) | |
else: | |
self.rng = rng | |
# keyword arguments for pool_2d | |
if not 'ignore_border' in kwargs4pool_2d: | |
kwargs4pool_2d['ignore_border'] = True | |
if kwargs4pool_2d['ignore_border'] == False: | |
print( '# PoolLayer warning: ignore_border is set to False' ) | |
if not 'st' in kwargs4pool_2d: | |
kwargs4pool_2d['st'] = None | |
if not 'padding' in kwargs4pool_2d: | |
kwargs4pool_2d['padding'] = ( 0, 0 ) | |
if not 'mode' in kwargs4pool_2d: | |
kwargs4pool_2d['mode'] = 'max' | |
if kwargs4pool_2d['mode'] != 'max': | |
print( '# PoolLayer warning: mode != max' ) | |
self.kwargs4pool_2d = kwargs4pool_2d | |
# dimension of the output | |
kwargs = {} | |
for kn in ( 'ignore_border', 'st', 'padding' ): | |
kwargs[kn] = self.kwargs4pool_2d[kn] | |
rv = Tsp.Pool.out_shape( self.Xshape, ds, **kwargs ) | |
self.Yshape = tuple( rv ) | |
self.Dout = np.prod( self.Yshape ) | |
def Top_outputRaw( self, X ): | |
# X: Ndat x Xshape | |
Y = Tsp.pool_2d( X, self.ds, **self.kwargs4pool_2d ) # Ndat x Yshape | |
return Y, Y | |
def Top_outputTrain( self, X ): | |
Y, Y = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
mask = self.rng.uniform( self.Yshape ) <= self.dropout | |
Ymask = Y * mask | |
return Ymask, Ymask | |
else: | |
return Y, Y | |
def Top_outputInference( self, X ): | |
Y, Y = self.Top_outputRaw( X ) | |
if self.dropout < 1.0: | |
Ymask = Y * self.dropout | |
return Ymask, Ymask | |
else: | |
return Y, Y | |
########## Full-Connection Layer ########## | |
class FullLayer( nnet.Layer ): | |
def __init__( self, Din, Nunit, afunc, rng = None, withBias = True, | |
Wini = 0.01, dropout = 1.0, T4toMat = False ): | |
super( FullLayer, self ).__init__( Din, Nunit, afunc, rng = rng, withBias = withBias, Wini = Wini, dropout = dropout ) | |
self.T4toMat = T4toMat | |
def super_Top_outputRaw( self, X ): | |
return super( FullLayer, self ).Top_outputRaw( X ) | |
def Top_outputRaw( self, X ): | |
if self.T4toMat: | |
return self.super_Top_outputRaw( X.reshape( ( X.shape[0], -1 ) ) ) | |
else: | |
return self.super_Top_outputRaw( X ) | |
########## Convolutional Neural Net ########## | |
class CNN( object ): | |
def __init__( self, Layers ): | |
# layers - list of Layer instances | |
self.Layers = Layers | |
assert isinstance( Layers[0], T4InputLayer ) | |
# theano functions | |
self.output = self.Tfunc_output() | |
self.cost = self.Tfunc_cost() | |
self.train = self.Tfunc_train() | |
# theano op for output computation ( for inference ) | |
def Top_output( self, X ): | |
# input layer | |
layer = self.Layers[0] | |
Zprev = layer.Top_outputInference( X ) | |
# hidden layers | |
for layer in self.Layers[1:]: | |
Y, Z = layer.Top_outputInference( Zprev ) | |
Zprev = Z | |
# output | |
Zsoftmax = T.nnet.softmax( Zprev ) | |
return Zsoftmax | |
# theano function for output computation ( for inference ) | |
def Tfunc_output( self ): | |
X = T.tensor4() # Ndat x Xnch x Xrow x Xcol | |
Z = self.Top_output( X ) | |
return theano.function( [ X ] , Z ) | |
# theano op for cost computation ( error term ) | |
def Top_cost( self, Z, lab ): | |
cost = T.nnet.categorical_crossentropy( Z, lab ) | |
return T.mean( cost ) | |
# theano function for cost computation | |
def Tfunc_cost( self ): | |
Z = T.matrix() # N x K | |
lab = T.ivector() # N-dim | |
return theano.function( [ Z, lab ], self.Top_cost( Z, lab ) ) | |
# theano function for gradient descent learning | |
def Tfunc_train( self ): | |
X = T.tensor4( 'X' ) | |
lab = T.ivector( 'lab' ) | |
eta = T.scalar( 'eta' ) | |
mu = T.scalar( 'mu' ) | |
lam = T.scalar( 'lambda' ) | |
# input layer | |
layer = self.Layers[0] | |
Zprev = layer.Top_outputTrain( X ) | |
# hidden layers | |
for layer in self.Layers[1:]: | |
Y, Z = layer.Top_outputTrain( Zprev ) | |
Zprev = Z | |
# output & cost | |
Z = T.nnet.softmax( Zprev ) | |
cost = self.Top_cost( Z, lab ) | |
# updatesList | |
updatesList = [] | |
for layer in self.Layers: | |
if isinstance( layer, T4InputLayer ) or isinstance( layer, PoolLayer ): | |
continue | |
updatesList += layer.T_update( cost, eta, mu, lam ) | |
return theano.function( [ X, lab, eta, mu, lam ], [ Z, cost ], updates = updatesList ) |