Last active
October 17, 2018 20:53
-
-
Save michaelwooley/df5228c41915d7ca4c7401a305a7ec3c to your computer and use it in GitHub Desktop.
Investigations into interrupting threads.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# `asyncio` Playground and Prototypes\n", | |
"\n", | |
"This notebook works out a strategy for calling arbitrary code in the context of a thread and interrupting that code.\n", | |
"\n", | |
"[The docs](https://docs.python.org/3/library/asyncio.html)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Simple Example of Threading With Coroutines\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import asyncio\n", | |
"import time\n", | |
"from threading import Thread" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"We define two coroutines that can be called by our process:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"async def coro_func(ii):\n", | |
" \"\"\"Startes, sleeps for a little bit of time, then returns.\"\"\"\n", | |
" try:\n", | |
" print(f'starting {ii}')\n", | |
" await asyncio.sleep(2, 42)\n", | |
" print(f'stopping {ii}')\n", | |
" except asyncio.CancelledError:\n", | |
" print(f'cancelled! {ii}')\n", | |
"\n", | |
"async def exec_(cmd):\n", | |
" \"\"\"Executes arbitrary code.\"\"\"\n", | |
" try:\n", | |
" code = compile(cmd, '<string>', 'exec')\n", | |
" f = exec(code)\n", | |
" await f\n", | |
" except asyncio.CancelledError:\n", | |
" raise KeyboardInterrupt" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Now we define a \"worker\" function that will host the event loop that runs on the thread:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def start_worker(loop):\n", | |
" \"\"\"Switch to new event loop and run forever\"\"\"\n", | |
"\n", | |
" asyncio.set_event_loop(loop)\n", | |
" loop.run_forever()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Start up the thread:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Create the new loop and worker thread\n", | |
"worker_loop = asyncio.new_event_loop() # asyncio.ProactorEventLoop()\n", | |
"worker = Thread(target=start_worker, args=(worker_loop, ))\n", | |
"# Start the thread\n", | |
"worker.start()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Here are two examples of the coroutines being called on the loop. Notice how `exec_output: 2` is printed in between the `coro_func` output statements." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"starting 1\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"<Future at 0x1f1e031d470 state=pending>" | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"exec_ output: 2\n" | |
] | |
} | |
], | |
"source": [ | |
"asyncio.run_coroutine_threadsafe(coro_func(1), worker_loop)\n", | |
"asyncio.run_coroutine_threadsafe(exec_('print(\"exec_ output:\", 1+1)'), worker_loop)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Co-routines can be cancelled but not with blocking code:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"starting 0\n", | |
"Canceled exec? True\n", | |
"exec_ start\n" | |
] | |
} | |
], | |
"source": [ | |
"fut = asyncio.run_coroutine_threadsafe(coro_func(0), worker_loop)\n", | |
"worker_loop.call_soon_threadsafe(fut.cancel)\n", | |
"\n", | |
"fut2 = asyncio.run_coroutine_threadsafe(exec_('print(\"exec_ start\")\\nfor ii in range(2000): time.sleep(0.001)\\nprint(\"exec_ stop\")'), worker_loop)\n", | |
"fut2.add_done_callback(lambda f: print('Canceled exec? ', f.cancelled()))\n", | |
"worker_loop.call_soon_threadsafe(fut2.cancel)\n", | |
"None" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"There is some odd behavior related to canceling `fut2`:\n", | |
"\n", | |
"- If you run this on a fresh kernel from the outset then:\n", | |
" - The callback on `done` is not triggered at all.\n", | |
" - _But_ it will not print `exec_ stop`\n", | |
"- If you re-run this on an already-started kernel:\n", | |
" - It will show that the future was canceled.\n", | |
" - It _will_ print `exec_ stop`\n", | |
" \n", | |
"It appears that on the \"clean\" run of the script the output related to the callback and `exec_ stop` is actually printed in a later cell. From the experiment below (i.e. is 'done' printed?) I've concluded that the ipython interpreter is just having a hard time figuring out where to position the relevant output from the thread rather than, e.g., truly stopping." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Graceful Interrupt of Thread??\n", | |
"\n", | |
"We can try to terminate and interrupt the thread gracefully with [killable threads](http://tomerfiliba.com/recipes/Thread2/). " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import threading\n", | |
"import inspect\n", | |
"import ctypes\n", | |
"\n", | |
"\n", | |
"def _async_raise(tid, exctype):\n", | |
" \"\"\"raises the exception, performs cleanup if needed\"\"\"\n", | |
" if not inspect.isclass(exctype):\n", | |
" raise TypeError(\"Only types can be raised (not instances)\")\n", | |
" res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))\n", | |
" if res == 0:\n", | |
" raise ValueError(\"invalid thread id\")\n", | |
" elif res != 1:\n", | |
" # \"\"\"if it returns a number greater than one, you're in trouble, \n", | |
" # and you should call it again with exc=NULL to revert the effect\"\"\"\n", | |
" ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)\n", | |
" raise SystemError(\"PyThreadState_SetAsyncExc failed\")\n", | |
"\n", | |
"\n", | |
"class Thread2(threading.Thread):\n", | |
" def _get_my_tid(self):\n", | |
" \"\"\"determines this (self's) thread id\"\"\"\n", | |
" if not self.isAlive():\n", | |
" raise threading.ThreadError(\"the thread is not active\")\n", | |
" \n", | |
" # do we have it cached?\n", | |
" if hasattr(self, \"_thread_id\"):\n", | |
" return self._thread_id\n", | |
" \n", | |
" # no, look for it in the _active dict\n", | |
" for tid, tobj in threading._active.items():\n", | |
" if tobj is self:\n", | |
" self._thread_id = tid\n", | |
" return tid\n", | |
" \n", | |
" raise AssertionError(\"could not determine the thread's id\")\n", | |
" \n", | |
" def raise_exc(self, exctype):\n", | |
" \"\"\"raises the given exception type in the context of this thread\"\"\"\n", | |
" _async_raise(self._get_my_tid(), exctype)\n", | |
" \n", | |
" def terminate(self):\n", | |
" \"\"\"raises SystemExit in the context of the given thread, which should \n", | |
" cause the thread to exit silently (unless caught)\"\"\"\n", | |
" self.raise_exc(SystemExit)\n", | |
" \n", | |
"async def exec2_(cmd):\n", | |
" \"\"\"Executes arbitrary code.\"\"\"\n", | |
" try:\n", | |
" code = compile(cmd, '<string>', 'exec')\n", | |
" exec(code)\n", | |
" to_return = 0\n", | |
" except (KeyboardInterrupt,Exception) as e:\n", | |
" to_return = -1\n", | |
" finally:\n", | |
" print(\"This space in 'finally' could be used to send the state to the process to the parent.\")\n", | |
" return to_return\n", | |
" \n", | |
"def exec3_(cmd):\n", | |
" \"\"\"Executes arbitrary code.\"\"\"\n", | |
" try:\n", | |
" code = compile(cmd, '<string>', 'exec')\n", | |
" exec(code)\n", | |
" to_return = 0\n", | |
" except (KeyboardInterrupt,Exception) as e:\n", | |
" to_return = -1\n", | |
" finally:\n", | |
" print(\"This space in 'finally' could be used to send the state to the process to the parent.\")\n", | |
" return to_return" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Create the new loop and worker thread\n", | |
"worker_loop = asyncio.new_event_loop() # asyncio.ProactorEventLoop()\n", | |
"\n", | |
"worker = Thread2(target=start_worker, args=(worker_loop, ))\n", | |
"# Start the thread\n", | |
"worker.start()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Wtih terminator! exec_ start\n", | |
"This space in 'finally' could be used to send the state to the process to the parent.\n", | |
"Result from future: -1\n" | |
] | |
} | |
], | |
"source": [ | |
"ex1 = 'print(\"Wtih terminator! exec_ start\");time.sleep(2);print(\"Wtih terminator! exec_ stop\")'\n", | |
"fut3 = asyncio.run_coroutine_threadsafe(exec2_(ex1), worker_loop)\n", | |
"time.sleep(1)\n", | |
"worker.raise_exc(ValueError)\n", | |
"time.sleep(3)\n", | |
"print(f'Result from future: {fut3.result()}')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Sync?? exec_ start\n", | |
"Sync?? exec_ stop\n", | |
"This space in 'finally' could be used to send the state to the process to the parent.\n" | |
] | |
} | |
], | |
"source": [ | |
"ex1 = 'print(\"Sync?? exec_ start\");time.sleep(2);print(\"Sync?? exec_ stop\")'\n", | |
"worker_loop.call_soon_threadsafe(exec3_, ex1)\n", | |
"time.sleep(3)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"From the above example we can see that it is necessary to use coroutines here." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Chaining Futures?\n", | |
"\n", | |
"What if we added a future on to the end of the future when it completed?\n", | |
"\n", | |
"Won't really work because can't access the newly-created future at the end:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"True" | |
] | |
}, | |
"execution_count": 17, | |
"metadata": {}, | |
"output_type": "execute_result" | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"starting 0\n", | |
"stopping 0\n", | |
"starting 2\n", | |
"stopping 2\n" | |
] | |
} | |
], | |
"source": [ | |
"def create_new_future(ii, loop):\n", | |
" return asyncio.run_coroutine_threadsafe(coro_func(ii), loop)\n", | |
"\n", | |
"fut4 = asyncio.run_coroutine_threadsafe(coro_func(0), worker_loop)\n", | |
"fut5 = fut4.add_done_callback(lambda f: create_new_future(2, worker_loop))\n", | |
"fut5 is None" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Notice how this collected a lot of output from earlier cells: this was the active output cell when the thread printed contents to stdout." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.0" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment