Last active December 21, 2015 22:39
Comparison of a forward function written in theano with the numpy version.
from numpy import zeros, dot, exp, tanh, array, allclose
from numpy.random import randn
from copy import deepcopy
from time import time
from theano import tensor as T
from theano import function, shared, config
export THEANO_FLAGS=mode=FAST_RUN,device=gpu,floatX=float32
FLOAT_PRECISION = config.floatX
class Network:
def __init__(self,ni,ns,initial=0.1,maxlen=2500):
na = 1+ni+ns
self.dims = ni,ns,na
def init_variables(self,initial,maxlen=2500):
n = maxlen
ni,ns,na = self.dims
self.WGI = array(randn(ns,na)*initial, dtype=FLOAT_PRECISION)
self.WGO = array(randn(ns,na)*initial, dtype=FLOAT_PRECISION)
self.WCI = array(randn(ns,na)*initial, dtype=FLOAT_PRECISION)
self.source = array(zeros([n,na]), dtype=FLOAT_PRECISION)
self.cix = array(zeros([n,ns]), dtype=FLOAT_PRECISION) = array(zeros([n,ns]), dtype=FLOAT_PRECISION)
self.gix = array(zeros([n,ns]), dtype=FLOAT_PRECISION) = array(zeros([n,ns]), dtype=FLOAT_PRECISION)
self.gox = array(zeros([n,ns]), dtype=FLOAT_PRECISION)
self.go = array(zeros([n,ns]), dtype=FLOAT_PRECISION)
self.state = array(zeros([n,ns]), dtype=FLOAT_PRECISION)
self.output = array(zeros([n,ns]), dtype=FLOAT_PRECISION)
def forward(self,xs):
def ffunc(x):
return 1.0/(1.0+exp(-x))
ni,ns,na = self.dims
n = len(xs)
for t in range(n):
self.source[t,0] = 1
self.source[t,1:1+ni] = xs[t]
self.source[t,1+ni:] = zeros(ns)
dot(self.WCI,self.source[t],out=self.cix[t])[t] = ffunc(self.gix[t])[t] = tanh(self.cix[t])
self.state[t] =[t]*[t]
self.go[t] = ffunc(self.gox[t])
self.output[t] = tanh(self.state[t]) * self.go[t]
return self.output[:n]
class Network_Theano(Network):
def __init__(self, original_net, ni,ns,maxlen=2500):
na = 1+ni+ns
# to make sure, both networks produce the same results
self.initforwardTheano(ns, maxlen, na)
def copy_weights(self, original_net):
self.WGI = deepcopy(original_net.WGI)
self.WGO = deepcopy(original_net.WGO)
self.WCI = deepcopy(original_net.WCI)
def uploadweightsTheano(self):
self.TWGI_shared = shared(self.WGI)
self.TWGO_shared = shared(self.WGO)
self.TWCI_shared = shared(self.WCI)
def initforwardTheano(self, ns, n, na):
def Tffunc(x):
Tone = array([1.0], dtype=FLOAT_PRECISION)
return Tone/(Tone+T.exp(-x))
self.Toutput = shared(zeros([n, ns], dtype=FLOAT_PRECISION))
self.Tstate_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION))
self.Tgo_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION))
self.Tsource_shared = shared(zeros([n, na], dtype=FLOAT_PRECISION))
self.Tgi_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION))
self.Tci_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION))
self.Txs_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION))
Tstate = T.vector('Tstate')
Tt = T.iscalar('Tt')
Txs = self.Txs_shared[Tt]
Tzeros = T.zeros([ns], dtype=FLOAT_PRECISION)
Tsource = T.vector('Tsource')
Tone = array([1.0], dtype=FLOAT_PRECISION)
Tsource = T.concatenate([Tone, Txs, Tzeros])
Tgix =, Tsource)
Tgox =, Tsource)
Tcix =, Tsource)
Tgi = Tffunc(Tgix)
Tci = T.tanh(Tcix)
Tstate = Tci * Tgi
Tgo = Tffunc(Tgox)
TToutput = (self.Toutput, T.set_subtensor(self.Toutput[Tt], T.tanh(Tstate) * Tgo))
TTstate = (self.Tstate_shared, T.set_subtensor(self.Tstate_shared[Tt], Tstate))
TTsource = (self.Tsource_shared, T.set_subtensor(self.Tsource_shared[Tt], Tsource))
TTgo = (self.Tgo_shared, T.set_subtensor(self.Tgo_shared[Tt], Tgo))
TTgi = (self.Tgi_shared, T.set_subtensor(self.Tgi_shared[Tt], Tgi))
TTci = (self.Tci_shared, T.set_subtensor(self.Tci_shared[Tt], Tci))
updates = [TTgo, TToutput, TTstate, TTsource, TTgi, TTci]
self.Tforward = function([Tt], updates=updates)
def forward(self, xs):
n = len(xs)
for t in range(n):
# init
ninput = 48
nstates = 100
seqlength = 1000
network_orig = Network(ninput, nstates)
network_theano = Network_Theano(network_orig, ninput, nstates)
data = array(randn(seqlength, ninput), dtype=FLOAT_PRECISION)
# numpy
starttime = time()
output = network_orig.forward(data)
print "nympy takes {}s".format(time() - starttime)
# theano
starttime = time()
print "theano takes {}s".format(time() - starttime)
output_theano = network_theano.Toutput.get_value()[:seqlength]
#check the results
if not allclose(output, output_theano, rtol=1e-04, atol=1e-07):
import pdb; pdb.set_trace()
