|
import numpy as np |
|
import theano |
|
import theano.tensor as T |
|
|
|
# activation functions |
|
d_afunc = { 'linear': lambda Y: Y, |
|
'sigmoid': T.nnet.sigmoid, |
|
'ReLu': lambda Y: T.switch( Y > 0, Y, 0 ) } |
|
|
|
def randomstreams( seed ): |
|
|
|
return T.shared_randomstreams.RandomStreams( seed = seed ) |
|
|
|
|
|
########## input layer ########## |
|
|
|
class InputLayer( object ): |
|
|
|
def __init__( self, D, dropout = 1.0 ): |
|
|
|
self.Din = D |
|
self.Nunit = D |
|
self.dropout = dropout |
|
|
|
|
|
def Top_output( self, X ): |
|
|
|
if self.dropout < 1.0: |
|
return X * self.dropout |
|
else: |
|
return X |
|
|
|
|
|
def Top_generateMask( self, rng ): |
|
|
|
return rng.uniform( ( self.Nunit, ) ) <= self.dropout |
|
|
|
|
|
def Top_outputMasked( self, X, mask ): |
|
|
|
return X * mask |
|
|
|
|
|
|
|
########## hidden layers ########## |
|
|
|
class Layer( object ): |
|
|
|
def __init__( self, Din, Nunit, afunc, withBias = True, Wini = 0.01, |
|
dropout = 1.0 ): |
|
|
|
self.Din = Din |
|
self.Nunit = Nunit |
|
self.afunc = afunc |
|
self.withBias = withBias |
|
self.dropout = dropout |
|
|
|
# making theano shared variables for weights & biases |
|
floatX = theano.config.floatX |
|
W = Wini * np.random.standard_normal( ( Nunit, Din ) ) |
|
self.W = theano.shared( np.asarray( W, dtype = floatX ) ) |
|
self.dW = theano.shared( np.zeros( ( Nunit, Din ), dtype = floatX ) ) |
|
if self.withBias: |
|
self.b = theano.shared( np.zeros( Nunit, dtype = floatX ) ) |
|
self.db = theano.shared( np.zeros( Nunit, dtype = floatX ) ) |
|
|
|
# theano functions |
|
self.setWeight = self.Tfunc_setWeight() |
|
|
|
|
|
def Tfunc_setWeight( self ): |
|
|
|
W = T.matrix() |
|
if self.withBias: |
|
b = T.vector() |
|
inList = [ W, b ] |
|
upList = [ ( self.W, W ), ( self.b, b ) ] |
|
else: |
|
inList = [ W ] |
|
upList = [ ( self.W, W ) ] |
|
|
|
return theano.function( inList, None, updates = upList ) |
|
|
|
|
|
def getWeight( self ): |
|
|
|
W = self.W.get_value() |
|
if self.withBias: |
|
b = self.b.get_value() |
|
return [ W, b ] |
|
else: |
|
return W |
|
|
|
|
|
def Top_outputRaw( self, X ): |
|
|
|
Y = T.dot( X, self.W.T ) |
|
if self.withBias: |
|
Y += self.b |
|
Z = d_afunc[self.afunc]( Y ) |
|
|
|
return Y, Z |
|
|
|
|
|
def Top_output( self, X ): |
|
|
|
Y, Z = self.Top_outputRaw( X ) |
|
if self.dropout < 1.0: |
|
Z *= self.dropout |
|
|
|
return Y, Z |
|
|
|
|
|
def Top_generateMask( self, rng ): |
|
|
|
return rng.uniform( ( self.Nunit, ) ) <= self.dropout |
|
|
|
|
|
def Top_outputMasked( self, X, mask ): |
|
|
|
Y, Z = self.Top_outputRaw( X ) |
|
|
|
return Y, Z * mask |
|
|
|
|
|
def T_update( self, cost, eta, mu, lam ): |
|
|
|
gradW = T.grad( cost, self.W ) |
|
dWnew = -eta * ( gradW + lam * self.W ) + mu * self.dW |
|
Wnew = self.W + dWnew |
|
upList = [ ( self.W, Wnew ), ( self.dW, dWnew ) ] |
|
if self.withBias: |
|
gradb = T.grad( cost, self.b ) |
|
# no weight decay for bias |
|
dbnew = -eta * gradb + mu * self.db |
|
bnew = self.b + dbnew |
|
upList += [ ( self.b, bnew ), ( self.db, dbnew ) ] |
|
|
|
return upList |
|
|
|
|
|
def T_updateMasked( self, cost, eta, mu, lam, mask ): |
|
|
|
M = T.shape_padright( mask ) |
|
gradW = T.grad( cost, self.W ) |
|
#dWnew = -eta * ( gradW + lam * self.W ) + mu * self.dW |
|
dWnewOn = -eta * ( gradW + lam * self.W ) + mu * self.dW |
|
dWnew = T.switch( M, dWnewOn, self.dW ) |
|
Wnew = T.switch( M, self.W + dWnew, self.W ) |
|
upList = [ ( self.W, Wnew ), ( self.dW, dWnew ) ] |
|
|
|
if self.withBias: |
|
gradb = T.grad( cost, self.b ) |
|
# no weight decay for bias |
|
#dbnew = -eta * gradb + mu * self.db |
|
dbnewOn = -eta * gradb + mu * self.db |
|
dbnew = T.switch( mask, dbnewOn, self.db ) |
|
#bnew = self.b + dbnew |
|
bnew = T.switch( mask, self.b + dbnew, self.b ) |
|
upList += [ ( self.b, bnew ), ( self.db, dbnew ) ] |
|
|
|
return upList |
|
|
|
|
|
def T_updateMasked2( self, cost, eta, mu, lam, maskI, maskO ): |
|
|
|
M = T.outer( maskO, maskI ) |
|
gradW = T.grad( cost, self.W ) |
|
#dWnew = -eta * ( gradW + lam * self.W ) + mu * self.dW |
|
dWnewOn = -eta * ( gradW + lam * self.W ) + mu * self.dW |
|
dWnew = T.switch( M, dWnewOn, self.dW ) |
|
Wnew = T.switch( M, self.W + dWnew, self.W ) |
|
upList = [ ( self.W, Wnew ), ( self.dW, dWnew ) ] |
|
|
|
if self.withBias: |
|
gradb = T.grad( cost, self.b ) |
|
# no weight decay for bias |
|
#dbnew = -eta * gradb + mu * self.db |
|
dbnewOn = -eta * gradb + mu * self.db |
|
dbnew = T.switch( maskO, dbnewOn, self.db ) |
|
#bnew = self.b + dbnew |
|
bnew = T.switch( maskO, self.b + dbnew, self.b ) |
|
upList += [ ( self.b, bnew ), ( self.db, dbnew ) ] |
|
|
|
return upList |
|
|
|
|
|
########## MLP ########## |
|
|
|
class MLP( object ): |
|
|
|
def __init__( self, Layers, rng = None ): |
|
|
|
floatX = theano.config.floatX |
|
|
|
# layers - list of Layer instances |
|
self.Layers = Layers |
|
assert isinstance( Layers[0], InputLayer ) |
|
dropout = np.empty( len( Layers ) ) |
|
for i in range( len( dropout ) ): |
|
dropout[i] = Layers[i].dropout |
|
self.withDropout = np.prod( dropout ) < 1.0 |
|
|
|
# random number generator |
|
if rng == None: |
|
self.rng = randomstreams( 0 ) |
|
else: |
|
self.rng = rng |
|
|
|
# theano functions |
|
self.output = self.Tfunc_output() |
|
self.cost = self.Tfunc_cost() |
|
self.train = self.Tfunc_train() |
|
|
|
|
|
# theano op for output computation ( for test ) |
|
def Top_output( self, X ): |
|
|
|
# input layer |
|
layer = self.Layers[0] |
|
Zprev = layer.Top_output( X ) |
|
|
|
# hidden layers |
|
for layer in self.Layers[1:]: |
|
Y, Z = layer.Top_output( Zprev ) |
|
Zprev = Z |
|
|
|
# output |
|
Zsoftmax = T.nnet.softmax( Zprev ) |
|
|
|
return Zsoftmax |
|
|
|
|
|
|
|
# theano function for output computation ( for test ) |
|
def Tfunc_output( self ): |
|
|
|
X = T.matrix() # N x D |
|
Z = self.Top_output( X ) |
|
|
|
return theano.function( [ X ] , Z ) |
|
|
|
|
|
# theano op for cost computation ( error term ) |
|
def Top_cost( self, Z, lab ): |
|
|
|
cost = T.nnet.categorical_crossentropy( Z, lab ) |
|
|
|
return T.mean( cost ) |
|
|
|
|
|
# theano function for cost computation |
|
def Tfunc_cost( self ): |
|
|
|
Z = T.matrix() # N x K |
|
lab = T.ivector() # N-dim |
|
return theano.function( [ Z, lab ], self.Top_cost( Z, lab ) ) |
|
|
|
|
|
|
|
# theano function for gradient descent learning |
|
def Tfunc_train( self ): |
|
|
|
X = T.matrix( 'X' ) # N x D |
|
lab = T.ivector( 'lab' ) # N-dim |
|
eta = T.scalar( 'eta' ) |
|
mu = T.scalar( 'mu' ) |
|
lam = T.scalar( 'lambda' ) |
|
|
|
''' |
|
if self.withDropout: |
|
maskList = [] |
|
''' |
|
|
|
# input layer |
|
layer = self.Layers[0] |
|
if self.withDropout: |
|
mask = layer.Top_generateMask( self.rng ) |
|
Zprev = layer.Top_outputMasked( X, mask ) |
|
#maskList.append( mask ) |
|
else: |
|
Zprev = layer.Top_output( X ) |
|
|
|
# hidden layers |
|
for layer in self.Layers[1:]: |
|
if self.withDropout: |
|
mask = layer.Top_generateMask( self.rng ) |
|
Y, Z = layer.Top_outputMasked( Zprev, mask ) |
|
#maskList.append( mask ) |
|
else: |
|
Y, Z = layer.Top_output( Zprev ) |
|
Zprev = Z |
|
|
|
# output & cost |
|
Z = T.nnet.softmax( Zprev ) |
|
cost = self.Top_cost( Z, lab ) |
|
|
|
# updatesList |
|
updatesList = [] |
|
for i in range( len( self.Layers ) ): |
|
layer = self.Layers[i] |
|
if not isinstance( layer, InputLayer ): |
|
''' |
|
if self.withDropout: |
|
maskI, maskO = maskList[i-1], maskList[i] |
|
#updatesList += layer.T_updateMasked( cost, eta, mu, lam, maskO ) |
|
updatesList += layer.T_updateMasked2( cost, eta, mu, lam, maskI, maskO ) |
|
|
|
else: |
|
updatesList += layer.T_update( cost, eta, mu, lam ) |
|
''' |
|
updatesList += layer.T_update( cost, eta, mu, lam ) |
|
|
|
|
|
return theano.function( [ X, lab, eta, mu, lam ], [ Z, cost ], updates = updatesList ) |