Skip to content

Instantly share code, notes, and snippets.

@minrk
Created May 30, 2012 05:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save minrk/2833944 to your computer and use it in GitHub Desktop.
Save minrk/2833944 to your computer and use it in GitHub Desktop.
Sample notebook for backgrounding a simulation on IPython engines
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "BackgroundLoop"
},
"nbformat": 3,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": [
"%pylab inline",
"",
"import sys, time",
"import numpy as np",
""
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Create the Client and View:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from IPython import parallel",
"rc = parallel.Client(profile=\"mpi\")",
"dv = rc[:]",
"dv.block = True",
"dv.activate()",
"dv"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Check for MPI. It's fine for this example if we don't have it, but it will technically be possible for",
"simulation nodes to get slightly out of sync."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%px",
"try:",
" from mpi4py import MPI",
"except ImportError:",
" bcast = lambda buf: buf",
" barrier = lambda : None",
" rank = 0",
" print \"No MPI, dummy sims may get slightly out of sync\"",
"else:",
" mpi = MPI.COMM_WORLD",
" bcast = mpi.bcast",
" barrier = mpi.barrier",
" rank = mpi.rank",
" print \"MPI rank: %i/%i\" % (mpi.rank,mpi.size)"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Quick test of the broadcast:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%px bcast(rank)"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Define a function on the engines that holds the GIL for a period of time",
"(like time.sleep, but holding the GIL to simulate blocking computation):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%px",
"from cython import inline",
"",
"def gilsleep(t):",
" \"\"\"gil-holding sleep with cython.inline\"\"\"",
" code = '\\n'.join([",
" 'from posix cimport unistd',",
" 'unistd.sleep(t)',",
" ])",
" inline(code, quiet=True, t=t)",
"",
"gilsleep(1)"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"And define our dummy simulation, which just refines the appropriate slice of sin(x),",
"after grabbing the GIL for a period of time:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%px",
"import numpy as np",
"",
"class DummySimulation(object):",
" def __init__(self, x1, x2, n, p, dt=1):",
" self.x1 = x1",
" self.x2 = x2",
" self.n = n",
" self.p = p",
" self.dt = dt",
" self.y = None",
" self.step = 0",
" ",
" def advance(self):",
" # include mpi barrier, for good measure",
" barrier()",
" self.step += 1",
" n = self.n",
" p = self.p",
" gilsleep(self.dt)",
" npoints = p * self.step",
" X = np.linspace(x1, x2, npoints)",
" x = X[n * npoints / p : (n+1) * npoints / p]",
" self.y = np.sin(x)",
" return self.step",
""
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Set up the global state (we are just sampling sin(x) on an interval), and instantiate the Simulation objects:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"x1 = 0",
"x2 = 10*np.pi",
"dv['x1'] = x1",
"dv['x2'] = x2",
"",
"dv.scatter('n', range(len(dv)), flatten=True)",
"dv['p'] = len(dv)",
"dv['step'] = 0",
"",
"%px sim = DummySimulation(x1, x2, n, p, dt=1)"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Take the first step, to make sure everything is in working order:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%px sim.advance()"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"This is the function we are going to use to inspect our intermediate result.",
"It fetches the current value of the simulation step, and the value of 'y',",
"and makes a plot."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from IPython.core.display import display",
"",
"def check_y():",
" \"\"\"fetch and plot the current state of the simulation\"\"\"",
" states = dv.apply_sync(lambda : (sim.step, sim.y))",
" steps, ys = zip(*states)",
" Y = np.concatenate(ys)",
" X = linspace(0, 10*np.pi, len(Y))",
" fig = plt.figure()",
" plt.title(\"step %i (%i points)\" % (steps[0], len(X)))",
" plt.plot(X, Y, 'o-')",
" display(fig)",
" plt.close(fig)"
],
"language": "python",
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"check_y()"
],
"language": "python",
"outputs": []
},
{
"cell_type": "heading",
"level": 2,
"source": [
"Now the interesting part"
]
},
{
"cell_type": "markdown",
"source": [
"Here we start a Thread that advances the simulation *in the background*.",
"",
"The engines should be responsive on the timescale of the gilsleep threshold (**1 seconds**, here):"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%%px --block",
"from threading import Thread",
"",
"class BackgroundAdvance(Thread):",
" \"\"\"Call a function periodically in a thread, until self.stop() is called\"\"\"",
" def __init__(self, advance):",
" Thread.__init__(self)",
" self._done = False",
" self._paused = False",
" self._advance = advance",
" ",
" ",
" def stop(self):",
" self._done = True",
" ",
" def pause(self):",
" self._paused = True",
" ",
" def resume(self):",
" self._paused = False",
" ",
" def run(self):",
" while True:",
" # all stop together:",
" self._done = bcast(self._done)",
" if self._done:",
" return",
" if self._paused:",
" barrier()",
" time.sleep(1)",
" else:",
" self._advance()",
"",
"sim_thread = BackgroundAdvance(sim.advance)",
"sim_thread.start()",
""
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Note how that cell actually finished running.",
"",
"Now the simulation is running in a background thread, and we can check in on it with out check_y():"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"for i in range(6):",
" check_y()",
" time.sleep(2)"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"And we can stop the background thread with a simple call:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%px sim_thread.stop()"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"And hopefully all the engines are on the same step at the end:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%px sim.step"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"And hopefully they are all stopped, as well:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"%px sim_thread.is_alive()"
],
"language": "python",
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"And we can continue to advance the sim manually, as well:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"for i in range(5):",
" %px sim.advance()",
" check_y()",
" sys.stdout.flush()",
""
],
"language": "python",
"outputs": []
}
]
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment