Skip to content

Instantly share code, notes, and snippets.

@ranr01
Last active May 25, 2018 02:52
Show Gist options
  • Save ranr01/a9fd9678c9a917547e1116a6fce102fe to your computer and use it in GitHub Desktop.
Save ranr01/a9fd9678c9a917547e1116a6fce102fe to your computer and use it in GitHub Desktop.
Interrupt running tasks in IpyParallel engines
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import ipyparallel\n",
"client = ipyparallel.Client()\n",
"dview = client.direct_view()\n",
"lbview = client.load_balanced_view()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import zmq\n",
"import socket\n",
"import time \n",
"import numpy as np\n",
"\n",
"class ipp_iterrupter(object):\n",
" def __init__(self,client,port=2525):\n",
" self.my_ip = socket.gethostbyname(socket.gethostname())\n",
" self.SIGINT_filter = 'interrupt_'+str(np.random.randint(0,1e5)) \n",
" self.context = zmq.Context()\n",
" self.zmq_socket = self.context.socket(zmq.PUB)\n",
" self.zmq_socket.bind(\"tcp://*:\"+str(port))\n",
" self.client = client\n",
" self.port = port\n",
" \n",
" def _safe_to_interrupt(self):\n",
" d=self.client.queue_status()\n",
" return np.array([d[eid]['queue'] + d[eid]['tasks'] > 0 for eid in self.client.ids]).all()\n",
"\n",
" def interrupt(self):\n",
" # The problem with just sending the inturrupt msg is that if the signal arrives while the engine is idle\n",
" # it will interrupt the kernel process itself and the engine will become unresponsive (die?)\n",
" # I guess this is because task execution is embeded inside a try block and exceptions are returned to the client\n",
" # while during idle time exceptions are not handled and some listening process is interrupted.\n",
" # The simplest solution is to submit some dummy jobs, make sure all engines are active and then send the interrupt signal.\n",
" self.client.abort()\n",
" time.sleep(1.)\n",
" a = self.client[:].apply_async(time.sleep,3)\n",
" time.sleep(1.)\n",
" if self._safe_to_interrupt():\n",
" self.zmq_socket.send_string(self.SIGINT_filter)\n",
" self.client.abort(block=True)\n",
" else:\n",
" raise RuntimeError(\"Unable to interrupt\")\n",
" \n",
" def __del__(self):\n",
" self.zmq_socket.close()\n",
" self.context.term()\n",
" \n",
" \n",
" "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<AsyncResult: create_interrupt_nanny>"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def create_interrupt_nanny(nanny_server_ip,nanny_server_port,SIGINT_filter):\n",
" from multiprocessing import Process\n",
" import os\n",
" \n",
" #target function for nanny process:\n",
" def interrupt_nanny(interrupt_pid,server_ip,server_port,SIGINT_filter):\n",
" # Socket to talk to server\n",
" import zmq\n",
" import os\n",
" import signal\n",
" context = zmq.Context()\n",
" socket = context.socket(zmq.SUB)\n",
" socket.connect(\"tcp://\"+server_ip+\":\"+str(server_port))\n",
" socket.setsockopt_string(zmq.SUBSCRIBE, SIGINT_filter)\n",
" while True:\n",
" string = socket.recv_string()\n",
" os.kill(interrupt_pid, signal.SIGINT)\n",
"\n",
" # run the target function in a deamon process so it will be killed when the kernel is(?)\n",
" p = Process(target=interrupt_nanny, \\\n",
" args=(os.getpid(),nanny_server_ip,nanny_server_port,SIGINT_filter),\\\n",
" daemon=True)\n",
" p.start()\n",
" #send a reference to the process to the engine global namespace\n",
" globals()['interrupt_nanny_process'] = p\n",
" \n",
"\n",
"interrupter = ipp_iterrupter(client)\n",
"dview.apply(create_interrupt_nanny,interrupter.my_ip,interrupter.port,interrupter.SIGINT_filter)"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [],
"source": [
"%%px --local\n",
"import time\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 76,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['ok', 'error', 'error', 'error', 'error', 'ok', 'ok', 'ok']"
]
},
"execution_count": 76,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def sleep_random(t_min,t_max):\n",
" time.sleep(np.random.randint(t_min,t_max))\n",
" \n",
"a = dview.apply_async(sleep_random,3,12)\n",
"time.sleep(5.0)\n",
"interrupter.interrupt()\n",
"a.status"
]
},
{
"cell_type": "code",
"execution_count": 77,
"metadata": {},
"outputs": [
{
"ename": "CompositeError",
"evalue": "one or more exceptions from call to method: sleep_random\n[1:apply]: KeyboardInterrupt: \n[2:apply]: KeyboardInterrupt: \n[3:apply]: KeyboardInterrupt: \n[4:apply]: KeyboardInterrupt: ",
"output_type": "error",
"traceback": [
"[1:apply]: ",
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)\u001b[0;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m",
"\u001b[0;32m<ipython-input-76-3ad798163dcb>\u001b[0m in \u001b[0;36msleep_random\u001b[0;34m(t_min, t_max)\u001b[0m",
"\u001b[0;31mKeyboardInterrupt\u001b[0m: ",
"",
"[2:apply]: ",
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)\u001b[0;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m",
"\u001b[0;32m<ipython-input-76-3ad798163dcb>\u001b[0m in \u001b[0;36msleep_random\u001b[0;34m(t_min, t_max)\u001b[0m",
"\u001b[0;31mKeyboardInterrupt\u001b[0m: ",
"",
"[3:apply]: ",
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)\u001b[0;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m",
"\u001b[0;32m<ipython-input-76-3ad798163dcb>\u001b[0m in \u001b[0;36msleep_random\u001b[0;34m(t_min, t_max)\u001b[0m",
"\u001b[0;31mKeyboardInterrupt\u001b[0m: ",
"",
"[4:apply]: ",
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)\u001b[0;32m<string>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m",
"\u001b[0;32m<ipython-input-76-3ad798163dcb>\u001b[0m in \u001b[0;36msleep_random\u001b[0;34m(t_min, t_max)\u001b[0m",
"\u001b[0;31mKeyboardInterrupt\u001b[0m: ",
""
]
}
],
"source": [
"a.get()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment