Last active
May 25, 2018 02:52
-
-
Save ranr01/a9fd9678c9a917547e1116a6fce102fe to your computer and use it in GitHub Desktop.
Interrupt running tasks in IpyParallel engines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import ipyparallel\n", | |
"client = ipyparallel.Client()\n", | |
"dview = client.direct_view()\n", | |
"lbview = client.load_balanced_view()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import zmq\n", | |
"import socket\n", | |
"import time \n", | |
"import numpy as np\n", | |
"\n", | |
"class ipp_iterrupter(object):\n", | |
" def __init__(self,client,port=2525):\n", | |
" self.my_ip = socket.gethostbyname(socket.gethostname())\n", | |
" self.SIGINT_filter = 'interrupt_'+str(np.random.randint(0,1e5)) \n", | |
" self.context = zmq.Context()\n", | |
" self.zmq_socket = self.context.socket(zmq.PUB)\n", | |
" self.zmq_socket.bind(\"tcp://*:\"+str(port))\n", | |
" self.client = client\n", | |
" self.port = port\n", | |
" \n", | |
" def _safe_to_interrupt(self):\n", | |
" d=self.client.queue_status()\n", | |
" return np.array([d[eid]['queue'] + d[eid]['tasks'] > 0 for eid in self.client.ids]).all()\n", | |
"\n", | |
" def interrupt(self):\n", | |
" # The problem with just sending the inturrupt msg is that if the signal arrives while the engine is idle\n", | |
" # it will interrupt the kernel process itself and the engine will become unresponsive (die?)\n", | |
" # I guess this is because task execution is embeded inside a try block and exceptions are returned to the client\n", | |
" # while during idle time exceptions are not handled and some listening process is interrupted.\n", | |
" # The simplest solution is to submit some dummy jobs, make sure all engines are active and then send the interrupt signal.\n", | |
" self.client.abort()\n", | |
" time.sleep(1.)\n", | |
" a = self.client[:].apply_async(time.sleep,3)\n", | |
" time.sleep(1.)\n", | |
" if self._safe_to_interrupt():\n", | |
" self.zmq_socket.send_string(self.SIGINT_filter)\n", | |
" self.client.abort(block=True)\n", | |
" else:\n", | |
" raise RuntimeError(\"Unable to interrupt\")\n", | |
" \n", | |
" def __del__(self):\n", | |
" self.zmq_socket.close()\n", | |
" self.context.term()\n", | |
" \n", | |
" \n", | |
" " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"<AsyncResult: create_interrupt_nanny>" | |
] | |
}, | |
"execution_count": 3, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"def create_interrupt_nanny(nanny_server_ip,nanny_server_port,SIGINT_filter):\n", | |
" from multiprocessing import Process\n", | |
" import os\n", | |
" \n", | |
" #target function for nanny process:\n", | |
" def interrupt_nanny(interrupt_pid,server_ip,server_port,SIGINT_filter):\n", | |
" # Socket to talk to server\n", | |
" import zmq\n", | |
" import os\n", | |
" import signal\n", | |
" context = zmq.Context()\n", | |
" socket = context.socket(zmq.SUB)\n", | |
" socket.connect(\"tcp://\"+server_ip+\":\"+str(server_port))\n", | |
" socket.setsockopt_string(zmq.SUBSCRIBE, SIGINT_filter)\n", | |
" while True:\n", | |
" string = socket.recv_string()\n", | |
" os.kill(interrupt_pid, signal.SIGINT)\n", | |
"\n", | |
" # run the target function in a deamon process so it will be killed when the kernel is(?)\n", | |
" p = Process(target=interrupt_nanny, \\\n", | |
" args=(os.getpid(),nanny_server_ip,nanny_server_port,SIGINT_filter),\\\n", | |
" daemon=True)\n", | |
" p.start()\n", | |
" #send a reference to the process to the engine global namespace\n", | |
" globals()['interrupt_nanny_process'] = p\n", | |
" \n", | |
"\n", | |
"interrupter = ipp_iterrupter(client)\n", | |
"dview.apply(create_interrupt_nanny,interrupter.my_ip,interrupter.port,interrupter.SIGINT_filter)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 47, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"%%px --local\n", | |
"import time\n", | |
"import numpy as np" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 76, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"['ok', 'error', 'error', 'error', 'error', 'ok', 'ok', 'ok']" | |
] | |
}, | |
"execution_count": 76, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"def sleep_random(t_min,t_max):\n", | |
" time.sleep(np.random.randint(t_min,t_max))\n", | |
" \n", | |
"a = dview.apply_async(sleep_random,3,12)\n", | |
"time.sleep(5.0)\n", | |
"interrupter.interrupt()\n", | |
"a.status" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 77, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"ename": "CompositeError", | |
"evalue": "one or more exceptions from call to method: sleep_random\n[1:apply]: KeyboardInterrupt: \n[2:apply]: KeyboardInterrupt: \n[3:apply]: KeyboardInterrupt: \n[4:apply]: KeyboardInterrupt: ", | |
"output_type": "error", | |
"traceback": [ | |
"[1:apply]: ", | |
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)\u001b[0;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m", | |
"\u001b[0;32m<ipython-input-76-3ad798163dcb>\u001b[0m in \u001b[0;36msleep_random\u001b[0;34m(t_min, t_max)\u001b[0m", | |
"\u001b[0;31mKeyboardInterrupt\u001b[0m: ", | |
"", | |
"[2:apply]: ", | |
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)\u001b[0;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m", | |
"\u001b[0;32m<ipython-input-76-3ad798163dcb>\u001b[0m in \u001b[0;36msleep_random\u001b[0;34m(t_min, t_max)\u001b[0m", | |
"\u001b[0;31mKeyboardInterrupt\u001b[0m: ", | |
"", | |
"[3:apply]: ", | |
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)\u001b[0;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m", | |
"\u001b[0;32m<ipython-input-76-3ad798163dcb>\u001b[0m in \u001b[0;36msleep_random\u001b[0;34m(t_min, t_max)\u001b[0m", | |
"\u001b[0;31mKeyboardInterrupt\u001b[0m: ", | |
"", | |
"[4:apply]: ", | |
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)\u001b[0;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m", | |
"\u001b[0;32m<ipython-input-76-3ad798163dcb>\u001b[0m in \u001b[0;36msleep_random\u001b[0;34m(t_min, t_max)\u001b[0m", | |
"\u001b[0;31mKeyboardInterrupt\u001b[0m: ", | |
"" | |
] | |
} | |
], | |
"source": [ | |
"a.get()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.6.3" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment