Skip to content

Instantly share code, notes, and snippets.

@Midnighter
Created May 12, 2014 13:06
Show Gist options
  • Save Midnighter/cf0edcd63a755873edf0 to your computer and use it in GitHub Desktop.
Save Midnighter/cf0edcd63a755873edf0 to your computer and use it in GitHub Desktop.
remote_abort
{
"metadata": {
"name": "",
"signature": "sha256:0e53fcc6596234459277ed6b1ad000f213b800ebfa369058a55077ce1f6c2ded"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "heading",
"level": 1,
"metadata": {},
"source": [
"Abort a Remote Job Queue"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import os\n",
"import sys\n",
"\n",
"from signal import SIGINT\n",
"\n",
"from IPython.parallel import (Client, interactive)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"@interactive\n",
"def worker(val):\n",
" print(val)\n",
" for i in range(int(1E6)):\n",
" val ** val\n",
" return val"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def main(remote_client):\n",
" d_view = remote_client.direct_view()\n",
" d_view.execute(\"import time\", block=True) # make level variable\n",
" l_view = remote_client.load_balanced_view()\n",
" results_it = l_view.map(worker, range(100), ordered=False, block=False)\n",
" print(sum(results_it))"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 3
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"remote_client = Client()\n",
"pid_map = remote_client[:].apply_async(os.getpid).get_dict()\n",
"try:\n",
" main(remote_client)\n",
"except: # we want to catch everything\n",
" (err, msg, trace) = sys.exc_info()\n",
" for (engine_id, pid) in pid_map.iteritems():\n",
" print(\"interrupting engine %d\" % engine_id)\n",
" try:\n",
" os.kill(pid, SIGINT)\n",
" except OSError:\n",
" print(\"didn't work\")\n",
" continue\n",
" raise err, msg, trace"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"interrupting engine 0\n",
"interrupting engine 1\n",
"interrupting engine 2\n",
"interrupting engine 3\n"
]
},
{
"ename": "KeyboardInterrupt",
"evalue": "",
"output_type": "pyerr",
"traceback": [
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)",
"\u001b[1;32m<ipython-input-4-a824fc0770e6>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m 2\u001b[0m \u001b[0mpid_map\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mremote_client\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mapply_async\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mos\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mgetpid\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mget_dict\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 3\u001b[0m \u001b[1;32mtry\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 4\u001b[1;33m \u001b[0mmain\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mremote_client\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 5\u001b[0m \u001b[1;32mexcept\u001b[0m\u001b[1;33m:\u001b[0m \u001b[1;31m# we want to catch everything\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 6\u001b[0m \u001b[1;33m(\u001b[0m\u001b[0merr\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mmsg\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mtrace\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0msys\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m<ipython-input-3-93b28c5c6145>\u001b[0m in \u001b[0;36mmain\u001b[1;34m(remote_client)\u001b[0m\n\u001b[0;32m 4\u001b[0m \u001b[0ml_view\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mremote_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mload_balanced_view\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 5\u001b[0m \u001b[0mresults_it\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0ml_view\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mmap\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mworker\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mrange\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m1000\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mordered\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mFalse\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mblock\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mFalse\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 6\u001b[1;33m \u001b[1;32mprint\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0msum\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mresults_it\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[1;32m/home/beber/.virtualenvs/base/lib/python2.7/site-packages/IPython/parallel/client/asyncresult.pyc\u001b[0m in \u001b[0;36m__iter__\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 592\u001b[0m \u001b[1;32mdef\u001b[0m \u001b[0m__iter__\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 593\u001b[0m \u001b[0mit\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_ordered_iter\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mordered\u001b[0m \u001b[1;32melse\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_unordered_iter\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 594\u001b[1;33m \u001b[1;32mfor\u001b[0m \u001b[0mr\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 595\u001b[0m \u001b[1;32myield\u001b[0m \u001b[0mr\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 596\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/home/beber/.virtualenvs/base/lib/python2.7/site-packages/IPython/parallel/client/asyncresult.pyc\u001b[0m in \u001b[0;36m_unordered_iter\u001b[1;34m(self)\u001b[0m\n\u001b[0;32m 633\u001b[0m \u001b[1;32mpass\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 634\u001b[0m \u001b[1;31m# update ready set with those no longer outstanding:\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 635\u001b[1;33m \u001b[0mready\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mpending\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdifference\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0m_client\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0moutstanding\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 636\u001b[0m \u001b[1;31m# update pending to exclude those that are finished\u001b[0m\u001b[1;33m\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 637\u001b[0m \u001b[0mpending\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mpending\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mdifference\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mready\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;31mKeyboardInterrupt\u001b[0m: "
]
}
],
"prompt_number": 4
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment