Skip to content

Instantly share code, notes, and snippets.

@mehaase
Last active January 15, 2020 22:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mehaase/325bcb3bc490a9f27dfd2375dda63b77 to your computer and use it in GitHub Desktop.
Save mehaase/325bcb3bc490a9f27dfd2375dda63b77 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Trio Background Loop\n",
"\n",
"This notebook demonstrates how we have added experimental support for running a Trio loop in the background of a Jupyter notebook, so that a cell can finish running while a background task continues indefinitely. Later cells can interact with that background task.\n",
"\n",
"This notebook also explains _how the background loop is implemented_, including a few drawbacks, so that we can get feedback on the approach.\n",
"\n",
"## Demonstration"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"%autoawait trio\n",
"import trio"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, let's take a look at an example of a background task. This background task updates a global variable once every second. This lets us check whether the background loop is still running: if the background loop dies, then the global variable will stop being updated."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"background_task_heartbeat = 1\n",
"\n",
"async def background_task():\n",
" global background_task_heartbeat\n",
" while True:\n",
" await trio.sleep(1)\n",
" background_task_heartbeat += 1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We also have a function to check that the background task is still alive by checking that the value of the global variable is still increasing."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"async def check_task():\n",
" x1 = background_task_heartbeat\n",
" await trio.sleep(1.1)\n",
" x2 = background_task_heartbeat\n",
" print('heartbeat: {}->{}\\nbackground task: {}'.format(x1, x2, \n",
" 'running' if x2 > x1 else 'stopped'))"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"heartbeat: 1->1\n",
"background task: stopped\n"
]
}
],
"source": [
"await check_task()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we start the background task and verify that it is running.\n",
"\n",
"Note that `ipykernel` created the name `GLOBAL_NURSERY` and inserted into ``builtins`` so that we can access it here. This is not an ideal user experience, and feedback on this point is welcomed."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"GLOBAL_NURSERY.start_soon(background_task)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"heartbeat: 1->2\n",
"background task: running\n"
]
}
],
"source": [
"await check_task()"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"heartbeat: 13->14\n",
"background task: running\n"
]
}
],
"source": [
"await check_task()"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"heartbeat: 20->21\n",
"background task: running\n"
]
}
],
"source": [
"await check_task()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We paid careful attention to exception handling. The next few cells show that coroutines that raise exceptions do not crash the entire event loop. (*How we accomplished this* is described under **Implementation** below.)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"async def background_exc(task_status=trio.TASK_STATUS_IGNORED):\n",
" task_status.started()\n",
" await trio.sleep(0)\n",
" raise Exception('foo')"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"ERROR:root:An exception occurred in a global nursery task.\n",
"Traceback (most recent call last):\n",
"\n",
" File \"<ipython-input-15-f54f58153246>\", line 4, in background_exc\n",
" raise Exception('foo')\n",
"\n",
"Exception: foo\n",
"\n"
]
}
],
"source": [
"await GLOBAL_NURSERY.start(background_exc)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"heartbeat: 114->115\n",
"background task: running\n"
]
}
],
"source": [
"await check_task()"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"ERROR:root:An exception occurred in a global nursery task.\n",
"Traceback (most recent call last):\n",
"\n",
" File \"<ipython-input-15-f54f58153246>\", line 4, in background_exc\n",
" raise Exception('foo')\n",
"\n",
"Exception: foo\n",
"\n"
]
}
],
"source": [
"GLOBAL_NURSERY.start_soon(background_exc)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"heartbeat: 126->127\n",
"background task: running\n"
]
}
],
"source": [
"await check_task()"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"ename": "Exception",
"evalue": "foo",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mException\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-21-9d28183b6037>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0;32mawait\u001b[0m \u001b[0mbackground_exc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m<ipython-input-15-f54f58153246>\u001b[0m in \u001b[0;36mbackground_exc\u001b[0;34m(task_status)\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0mtask_status\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstarted\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0;32mawait\u001b[0m \u001b[0mtrio\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msleep\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 4\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'foo'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;31mException\u001b[0m: foo"
]
}
],
"source": [
"await background_exc()"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"heartbeat: 130->131\n",
"background task: running\n"
]
}
],
"source": [
"await check_task()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Implementation\n",
"\n",
"We briefly describe the implementation here. The complete code [can be found here](https://github.com/HyperionGray/ipython/tree/trio-loop). All of the changes are made in the `ipython` repository, not in `ipykernel`, however we have only tested our patch with Jupyter notebooks. We don't expect that this will affect IPython, and we aren't sure how it will affect other UIs built on top of `ipykernel`.\n",
"\n",
"> Side note: The current implementation is too hacky to be merged as-is, and it will require additional documentation and tests, of course. The implementation is only provided as a jumping-off point to describe how this feature should work and whether we need to adopt a different technical approach to implement the desired behavior.\n",
"\n",
"The overall approach is to run Trio in a background thread. When a cell is executed, the code is dispatched to the thread where the Trio event loop lives.\n",
"\n",
"The majority of changes are in `IPython/core/async_helpers.py`, specifically in the existing `_trio_runner(async_fn)` function, which is the main entrypoint for IPython to execute Trio code. This function is called with a callable function object as its argument.\n",
"\n",
"In `trio_runner`, we made the following changes:\n",
"\n",
"```diff\n",
" def _trio_runner(async_fn):\n",
" import trio\n",
" \n",
"+ if not _TRIO_TOKEN:\n",
"+ _init_trio(trio)\n",
"+\n",
" async def loc(coro):\n",
" \"\"\"\n",
" We need the dummy no-op async def to protect from\n",
" trio's internal. See https://github.com/python-trio/trio/issues/89\n",
" \"\"\"\n",
" return await coro\n",
" \n",
"- return trio.run(loc, async_fn)\n",
"+ return trio.from_thread.run(loc, async_fn, trio_token=_TRIO_TOKEN)\n",
"```\n",
"\n",
"Starting at the bottom, notice that `trio.run()` is gone—we don't start a new event loop for every cell—and replaced with code to schedule the function to run on the Trio thread instead. Trio uses a \"token\" to identify loops, and we have stashed a reference to the current event loop in the global variable `_TRIO_TOKEN`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Where does `_TRIO_TOKEN` come from? It is initialized by `_init_trio(trio)`, which is a new function that starts up the event loop. Here's an excerpt from that function (it has been modified a bit from the committed version just to highlight what its doing.\n",
"\n",
"```python3\n",
"def _init_trio(trio):\n",
" async def trio_entry():\n",
" global _TRIO_TOKEN\n",
" _TRIO_TOKEN = trio.hazmat.current_trio_token()\n",
" async with trio.open_nursery() as nursery:\n",
" nursery._add_exc = log_nursery_exc\n",
" builtins.GLOBAL_NURSERY = nursery\n",
" await trio.sleep_forever()\n",
"\n",
" threading.Thread(target=trio.run, args=(trio_entry,)).start()\n",
"```\n",
"\n",
"Again, starting from the bottom, we create a new thread, and the thread entry point is `trio.run`, and it takes the nested function `async def trio_entry()` as its argument. The trio entry point:\n",
"\n",
"* Stashes the trio token in a global variable.\n",
"* Creates a new nursery.\n",
"* Stashes that nursery in `builtins` so it can be found from a notebook.\n",
"* Sleeps forever in order to keep the nursery open.\n",
"\n",
"From a user's point of view, they can run a task in the background by calling `await GLOBAL_NURSERY.start(...)` or `GLOBAL_NURSERY.start_soon(...)` just like in regular Trio code (except for the presence of a magical global nursery)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**There are a few tactical problems with this approach.** Of course, the code is messy and a lot of things need to change before this can possibly be merged, but the following problems are higher level and require some discussion with both Trio and IPython maintainers before we can proceed.\n",
"\n",
"The first problem is that if any background task raises an exception, it will crash the nursery and the entire event loop will exit. This completely defeats the purpose of a long-running loop, because Trio has no concept of \"restarting\" the loop. Once the loop exits, all of the other background tasks will be gone, too.\n",
"\n",
"To quickly hack around this, we set `nursery._add_exc = log_nursery_exc`. This is a function that Trio calls internally when a task raises an exception, and it cancels all of the other tasks in that nursery. We monkey patch it with `log_nursery_exc`, which prints out the exception and then returns. Other background tasks keep running.\n",
"\n",
"The second problem is that IPython tries to be careful about only executing async cells on the event loop. Synchronous cells are still executed through the default code path, which is called the \"pseudo sync runner\". These cells are still executed on the main thread, which means synchronous trio APIs won't work. For example, if a cell contains:\n",
"\n",
"```python3\n",
"trio.current_time()\n",
"```\n",
"\n",
"Then the cell will raise an exception:\n",
"\n",
"```\n",
"RuntimeError Traceback (most recent call last)\n",
"<ipython-input-4-bcb964c06988> in <module>\n",
"----> 1 trio.current_time()\n",
"\n",
"~/code/trio-jupyter/venv/lib/python3.8/site-packages/trio/_core/_generated_run.py in current_time()\n",
" 50 return GLOBAL_RUN_CONTEXT.runner.current_time()\n",
" 51 except AttributeError:\n",
"---> 52 raise RuntimeError('must be called from async context')\n",
" 53 \n",
" 54 def current_clock():\n",
"\n",
"RuntimeError: must be called from async context\n",
"```\n",
"\n",
"This exception arises because Trio recognizes that there is no event loop on the current thread. To make things more confusing for the user, adding any async code to the block will make it work, even if the async code comes *after* the offending instruction!\n",
"\n",
"```python3\n",
"trio.current_time()\n",
"await trio.sleep(0)\n",
"```\n",
"\n",
"This error arises because synchronous code is executed on the main thread. Therefore, we also quickly patched `IPython/core/interactiveshell.py` to execute all cells on the Trio runner when the notebook is in Trio mode.\n",
"\n",
"```diff\n",
" # run_cell_async is async, but may not actually need an eventloop.\n",
" # when this is the case, we want to run it using the pseudo_sync_runner\n",
" # so that code can invoke eventloops (for example via the %run , and\n",
" # `%paste` magic.\n",
"- if self.should_run_async(raw_cell):\n",
"+ if self.should_run_async(raw_cell) or self.loop_runner is _trio_runner:\n",
" runner = self.loop_runner\n",
" else:\n",
" runner = _pseudo_sync_runner\n",
"```\n",
"\n",
"This fixes the problem, but it's not particularly elegant."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment