Skip to content

Instantly share code, notes, and snippets.

@minrk
Created April 30, 2014 00:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/11415238 to your computer and use it in GitHub Desktop.
Save minrk/11415238 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"worksheets": [
{
"cells": [
{
"metadata": {},
"cell_type": "heading",
"source": "Passing results as input in IPython.parallel",
"level": 1
},
{
"metadata": {},
"cell_type": "markdown",
"source": "Passing results of one task as the input of another,\nwithout waiting for the first to finish.\n\nThis is not a use case that is particularly easy with IPython.parallel,\nbut there are a few ways it can be done.\n\nThe simplest way to do this is to persist the data on the engine,\nand run subsequent computation on the same engine where the data was loaded.\n\n"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "As usual, start by creating a client and view"
},
{
"metadata": {},
"cell_type": "code",
"input": "from IPython import parallel\nrc = parallel.Client()\nview = rc.load_balanced_view()",
"prompt_number": 1,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "markdown",
"source": "Now define our functions. We have three:\n\n1. load data ino the engine's namespace (a random array)\n2. do something with that data (get the 2-norm)\n3. remove data from the engine's namespace"
},
{
"metadata": {},
"cell_type": "code",
"input": "%px import numpy as np\n\ndef load_data(sz, name='data'):\n \"\"\"A function that loads data into a particular name in the global ns.\n \n In this case, a random array.\n \"\"\"\n data = np.random.random((sz,sz))\n globals()[name] = data\n\ndef get_norm(a):\n \"\"\"A simple analysis function, gets the 2-norm of an array\"\"\"\n return np.linalg.norm(a, 2)\n\ndef cleanup_name(name):\n \"\"\"Delete a name from the engine's namespace\"\"\"\n del globals()[name]",
"prompt_number": 2,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "markdown",
"source": "Now we submit the work. The relevant bits:\n\n1. load-balance the data-loading tasks without restriction.\n2. use `temp_flags(follow=load_task)` to ensure that analysis happens on the same engine as the load.\n3. use `parallel.Reference` to get engine-local data as an argument to the analysis function, without moving any data around.\n4. cleanup the namspace after the analysis is done.\n\nWe submit all of this work before we know where the data-loading will take place."
},
{
"metadata": {},
"cell_type": "code",
"input": "loads = {}\nnorms = {}\nfor sz in [ 2 ** p for p in range(10) ]:\n # create unique names, to avoid collision\n name = 'data_%i' % sz\n # load the data into the global namespace\n ar = loads[sz] = view.apply_async(load_data, sz, name=name)\n # submit the analysis to the same engine\n with view.temp_flags(follow=ar):\n # use a Reference to get the engine-local data\n # as an argument to the function\n norms[sz] = ar2 = view.apply_async(get_norm, parallel.Reference(name))\n # submit a final task to cleanup anything we put in the global namespace\n with view.temp_flags(follow=ar2):\n view.apply_async(cleanup_name, name).get()",
"prompt_number": 3,
"outputs": [],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "markdown",
"source": "Now we can display the results of our work:"
},
{
"metadata": {},
"cell_type": "code",
"input": "for sz in sorted(norms):\n ar = norms[sz]\n norm = ar.get()\n print(\"{0:>4}x{0:<4} 2-norm = {1:7.3f}\".format(sz, norm))",
"prompt_number": 4,
"outputs": [
{
"output_type": "stream",
"text": " 1x1 2-norm = 0.818\n 2x2 2-norm = 1.616\n 4x4 2-norm = 1.663\n 8x8 2-norm = 4.053\n 16x16 2-norm = 8.550\n 32x32 2-norm = 15.897\n 64x64 2-norm = 32.467\n 128x128 2-norm = 63.872\n 256x256 2-norm = 128.242\n 512x512 2-norm = 255.966\n",
"stream": "stdout"
}
],
"language": "python",
"trusted": true,
"collapsed": false
},
{
"metadata": {},
"cell_type": "markdown",
"source": "And finally, verify that we don't have any `data_foo` names lingering on any of the engines."
},
{
"metadata": {},
"cell_type": "code",
"input": "%px %who",
"prompt_number": 5,
"outputs": [
{
"output_type": "stream",
"text": "[stdout:0] Interactive namespace is empty.\n[stdout:1] Interactive namespace is empty.\n[stdout:2] Interactive namespace is empty.\n[stdout:3] Interactive namespace is empty.\n[stdout:4] Interactive namespace is empty.\n[stdout:5] Interactive namespace is empty.\n[stdout:6] Interactive namespace is empty.\n[stdout:7] Interactive namespace is empty.\n",
"stream": "stdout"
}
],
"language": "python",
"trusted": true,
"collapsed": false
}
],
"metadata": {}
}
],
"metadata": {
"name": "",
"signature": "sha256:16d61b1ac2e2bb438d687aa4bda7dd1cea13d2e344433b82d30b15ba89b962ca"
},
"nbformat": 3
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment