Skip to content

Instantly share code, notes, and snippets.

@michaelwooley
Last active October 17, 2018 20:53
Show Gist options
  • Save michaelwooley/df5228c41915d7ca4c7401a305a7ec3c to your computer and use it in GitHub Desktop.
Save michaelwooley/df5228c41915d7ca4c7401a305a7ec3c to your computer and use it in GitHub Desktop.
Investigations into interrupting threads.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# `asyncio` Playground and Prototypes\n",
"\n",
"This notebook works out a strategy for calling arbitrary code in the context of a thread and interrupting that code.\n",
"\n",
"[The docs](https://docs.python.org/3/library/asyncio.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Simple Example of Threading With Coroutines\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"import time\n",
"from threading import Thread"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We define two coroutines that can be called by our process:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"async def coro_func(ii):\n",
" \"\"\"Startes, sleeps for a little bit of time, then returns.\"\"\"\n",
" try:\n",
" print(f'starting {ii}')\n",
" await asyncio.sleep(2, 42)\n",
" print(f'stopping {ii}')\n",
" except asyncio.CancelledError:\n",
" print(f'cancelled! {ii}')\n",
"\n",
"async def exec_(cmd):\n",
" \"\"\"Executes arbitrary code.\"\"\"\n",
" try:\n",
" code = compile(cmd, '<string>', 'exec')\n",
" f = exec(code)\n",
" await f\n",
" except asyncio.CancelledError:\n",
" raise KeyboardInterrupt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we define a \"worker\" function that will host the event loop that runs on the thread:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def start_worker(loop):\n",
" \"\"\"Switch to new event loop and run forever\"\"\"\n",
"\n",
" asyncio.set_event_loop(loop)\n",
" loop.run_forever()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Start up the thread:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# Create the new loop and worker thread\n",
"worker_loop = asyncio.new_event_loop() # asyncio.ProactorEventLoop()\n",
"worker = Thread(target=start_worker, args=(worker_loop, ))\n",
"# Start the thread\n",
"worker.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here are two examples of the coroutines being called on the loop. Notice how `exec_output: 2` is printed in between the `coro_func` output statements."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"starting 1\n"
]
},
{
"data": {
"text/plain": [
"<Future at 0x1f1e031d470 state=pending>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"exec_ output: 2\n"
]
}
],
"source": [
"asyncio.run_coroutine_threadsafe(coro_func(1), worker_loop)\n",
"asyncio.run_coroutine_threadsafe(exec_('print(\"exec_ output:\", 1+1)'), worker_loop)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Co-routines can be cancelled but not with blocking code:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"starting 0\n",
"Canceled exec? True\n",
"exec_ start\n"
]
}
],
"source": [
"fut = asyncio.run_coroutine_threadsafe(coro_func(0), worker_loop)\n",
"worker_loop.call_soon_threadsafe(fut.cancel)\n",
"\n",
"fut2 = asyncio.run_coroutine_threadsafe(exec_('print(\"exec_ start\")\\nfor ii in range(2000): time.sleep(0.001)\\nprint(\"exec_ stop\")'), worker_loop)\n",
"fut2.add_done_callback(lambda f: print('Canceled exec? ', f.cancelled()))\n",
"worker_loop.call_soon_threadsafe(fut2.cancel)\n",
"None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There is some odd behavior related to canceling `fut2`:\n",
"\n",
"- If you run this on a fresh kernel from the outset then:\n",
" - The callback on `done` is not triggered at all.\n",
" - _But_ it will not print `exec_ stop`\n",
"- If you re-run this on an already-started kernel:\n",
" - It will show that the future was canceled.\n",
" - It _will_ print `exec_ stop`\n",
" \n",
"It appears that on the \"clean\" run of the script the output related to the callback and `exec_ stop` is actually printed in a later cell. From the experiment below (i.e. is 'done' printed?) I've concluded that the ipython interpreter is just having a hard time figuring out where to position the relevant output from the thread rather than, e.g., truly stopping."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Graceful Interrupt of Thread??\n",
"\n",
"We can try to terminate and interrupt the thread gracefully with [killable threads](http://tomerfiliba.com/recipes/Thread2/). "
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"import threading\n",
"import inspect\n",
"import ctypes\n",
"\n",
"\n",
"def _async_raise(tid, exctype):\n",
" \"\"\"raises the exception, performs cleanup if needed\"\"\"\n",
" if not inspect.isclass(exctype):\n",
" raise TypeError(\"Only types can be raised (not instances)\")\n",
" res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))\n",
" if res == 0:\n",
" raise ValueError(\"invalid thread id\")\n",
" elif res != 1:\n",
" # \"\"\"if it returns a number greater than one, you're in trouble, \n",
" # and you should call it again with exc=NULL to revert the effect\"\"\"\n",
" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)\n",
" raise SystemError(\"PyThreadState_SetAsyncExc failed\")\n",
"\n",
"\n",
"class Thread2(threading.Thread):\n",
" def _get_my_tid(self):\n",
" \"\"\"determines this (self's) thread id\"\"\"\n",
" if not self.isAlive():\n",
" raise threading.ThreadError(\"the thread is not active\")\n",
" \n",
" # do we have it cached?\n",
" if hasattr(self, \"_thread_id\"):\n",
" return self._thread_id\n",
" \n",
" # no, look for it in the _active dict\n",
" for tid, tobj in threading._active.items():\n",
" if tobj is self:\n",
" self._thread_id = tid\n",
" return tid\n",
" \n",
" raise AssertionError(\"could not determine the thread's id\")\n",
" \n",
" def raise_exc(self, exctype):\n",
" \"\"\"raises the given exception type in the context of this thread\"\"\"\n",
" _async_raise(self._get_my_tid(), exctype)\n",
" \n",
" def terminate(self):\n",
" \"\"\"raises SystemExit in the context of the given thread, which should \n",
" cause the thread to exit silently (unless caught)\"\"\"\n",
" self.raise_exc(SystemExit)\n",
" \n",
"async def exec2_(cmd):\n",
" \"\"\"Executes arbitrary code.\"\"\"\n",
" try:\n",
" code = compile(cmd, '<string>', 'exec')\n",
" exec(code)\n",
" to_return = 0\n",
" except (KeyboardInterrupt,Exception) as e:\n",
" to_return = -1\n",
" finally:\n",
" print(\"This space in 'finally' could be used to send the state to the process to the parent.\")\n",
" return to_return\n",
" \n",
"def exec3_(cmd):\n",
" \"\"\"Executes arbitrary code.\"\"\"\n",
" try:\n",
" code = compile(cmd, '<string>', 'exec')\n",
" exec(code)\n",
" to_return = 0\n",
" except (KeyboardInterrupt,Exception) as e:\n",
" to_return = -1\n",
" finally:\n",
" print(\"This space in 'finally' could be used to send the state to the process to the parent.\")\n",
" return to_return"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"# Create the new loop and worker thread\n",
"worker_loop = asyncio.new_event_loop() # asyncio.ProactorEventLoop()\n",
"\n",
"worker = Thread2(target=start_worker, args=(worker_loop, ))\n",
"# Start the thread\n",
"worker.start()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wtih terminator! exec_ start\n",
"This space in 'finally' could be used to send the state to the process to the parent.\n",
"Result from future: -1\n"
]
}
],
"source": [
"ex1 = 'print(\"Wtih terminator! exec_ start\");time.sleep(2);print(\"Wtih terminator! exec_ stop\")'\n",
"fut3 = asyncio.run_coroutine_threadsafe(exec2_(ex1), worker_loop)\n",
"time.sleep(1)\n",
"worker.raise_exc(ValueError)\n",
"time.sleep(3)\n",
"print(f'Result from future: {fut3.result()}')"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Sync?? exec_ start\n",
"Sync?? exec_ stop\n",
"This space in 'finally' could be used to send the state to the process to the parent.\n"
]
}
],
"source": [
"ex1 = 'print(\"Sync?? exec_ start\");time.sleep(2);print(\"Sync?? exec_ stop\")'\n",
"worker_loop.call_soon_threadsafe(exec3_, ex1)\n",
"time.sleep(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"From the above example we can see that it is necessary to use coroutines here."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Chaining Futures?\n",
"\n",
"What if we added a future on to the end of the future when it completed?\n",
"\n",
"Won't really work because can't access the newly-created future at the end:"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"starting 0\n",
"stopping 0\n",
"starting 2\n",
"stopping 2\n"
]
}
],
"source": [
"def create_new_future(ii, loop):\n",
" return asyncio.run_coroutine_threadsafe(coro_func(ii), loop)\n",
"\n",
"fut4 = asyncio.run_coroutine_threadsafe(coro_func(0), worker_loop)\n",
"fut5 = fut4.add_done_callback(lambda f: create_new_future(2, worker_loop))\n",
"fut5 is None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice how this collected a lot of output from earlier cells: this was the active output cell when the thread printed contents to stdout."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment