Skip to content

Instantly share code, notes, and snippets.

@stray-leone
Forked from mrocklin/kalman_mpi.py
Last active August 29, 2015 14:18
Show Gist options
  • Save stray-leone/49896df4aaf0fe67f11c to your computer and use it in GitHub Desktop.
Save stray-leone/49896df4aaf0fe67f11c to your computer and use it in GitHub Desktop.
import theano
from theano.tensor.io import send, recv, mpi_cmps
import theano.sandbox.linalg as linalg
from theano.gof.sched import sort_schedule_fn
from time import time
dot = theano.tensor.dot
dtype = 'float32'
n = 500
run = False
# Set up a linker that orders nodes to overlap computation and communication
mpi_scheduler = sort_schedule_fn(*mpi_cmps)
mpi_linker = theano.OpWiseCLinker(schedule=mpi_scheduler)
mpi_mode = theano.Mode(linker=mpi_linker)
# initialize MPI
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0 or not run:
# Create some input variables
mu = theano.tensor.matrix('mu')
Sigma = theano.tensor.matrix('Sigma')
H = theano.tensor.matrix('H')
R = theano.tensor.matrix('R')
data = theano.tensor.matrix('data')
input_shapes = { mu: (n, 1),
Sigma: (n, n),
H: (n, n),
R: (n, n),
data: (n, 1)}
# Some intermediate variables
A = dot(Sigma, H.T)
B = R + dot(H, dot(Sigma, H.T))
new_mu = mu + dot(A, linalg.solve(B, dot(H, mu) - data))
new_mu.name = "updated_mu"
# Send data to 1
receipts = send(H, 1, 1), send(B, 1, 2), send(Sigma, 1, 3), send(A, 1, 4)
# Get back the work that 1 did
new_Sigma = recv((n, n), dtype, 1, 5)
# Compile
inputs_0 = (mu, Sigma, H, R, data)
outputs_0 = (new_mu, new_Sigma) + receipts
f0 = theano.function(inputs_0, outputs_0, mode=mpi_mode)
nodes0 = f0.maker.linker.make_all()[-1] # for debug
if run:
# Generate random inputs
numeric_inputs = [np.random.rand(*input_shapes[inp]).astype(dtype)
for inp in inputs_0]
a, b, _, _, _, _ = f0(*numeric_inputs) # warm start
# Run and time
comm.barrier()
starttime = time()
a, b, _, _, _, _ = f0(*numeric_inputs)
comm.barrier()
endtime = time()
print endtime - starttime
if rank == 1 or not run:
# Receive some data from 0
H = recv((n, n), dtype, 0, 1)
B = recv((n, n), dtype, 0, 2)
Sigma = recv((n, n), dtype, 0, 3)
A = recv((n, n), dtype, 0, 4)
# Do some computation
new_Sigma = Sigma - dot(dot(A, linalg.solve(B, H)), Sigma)
new_Sigma.name = "updated_Sigma"
# Send it back to 0
receipt = send(new_Sigma, 0, 5)
# compile locally using Theano
inputs_1 = ()
outputs_1 = (receipt, )
f1 = theano.function(inputs_1, outputs_1, mode=mpi_mode)
nodes1 = f1.maker.linker.make_all()[-1] # for debug
if run:
_ = f1() # warm start
comm.barrier()
_ = f1()
comm.barrier()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment