Skip to content

Instantly share code, notes, and snippets.

@Vexs
Created March 24, 2021 22:13
Show Gist options
  • Save Vexs/057482dc890bcbfd87317239890f3274 to your computer and use it in GitHub Desktop.
Save Vexs/057482dc890bcbfd87317239890f3274 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Why asyncio?\n",
"\n",
"## Networking speed:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:23:59.920973Z",
"start_time": "2021-03-09T05:23:59.520404Z"
}
},
"outputs": [],
"source": [
"import requests\n",
"import aiohttp\n",
"import asyncio\n",
"from ipywidgets import widgets\n",
"from concurrent import futures"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:00.365551Z",
"start_time": "2021-03-09T05:24:00.213024Z"
},
"scrolled": true
},
"outputs": [],
"source": [
"r = requests.get('https://www.google.com/')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:00.982262Z",
"start_time": "2021-03-09T05:24:00.844239Z"
}
},
"outputs": [],
"source": [
"async with aiohttp.ClientSession() as session:\n",
" async with session.get('https://www.google.com/') as r:\n",
" await r.read()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:02.929750Z",
"start_time": "2021-03-09T05:24:01.466392Z"
}
},
"outputs": [],
"source": [
"session=requests.session()\n",
"\n",
"for _ in range(20):\n",
" session.get('https://www.google.com/')"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:03.159289Z",
"start_time": "2021-03-09T05:24:02.956253Z"
}
},
"outputs": [],
"source": [
"async with aiohttp.ClientSession() as session:\n",
"\n",
" async def get(url):\n",
" async with session.get(url) as response:\n",
" return await response.read()\n",
"\n",
" responses = await asyncio.gather(*[get('https://www.google.com/') for _ in range(20)])\n",
" \n",
" [r for r in responses]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:03.798400Z",
"start_time": "2021-03-09T05:24:03.190294Z"
}
},
"outputs": [],
"source": [
"with futures.ThreadPoolExecutor(max_workers=4) as executor:\n",
" responses = [executor.submit(lambda: requests.get(\"https://www.google.com/\")) for _ in range(20)]\n",
"\n",
" [f.result().status_code for f in responses]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Request speed, overall:\n",
"<img src=\"requests performance.png\" width=\"500\" height=\"400\">\n",
"\n",
"As you can see, even in single-use cases aiohttp has an edge (probably because it is implemented in C, unlike requests) and the edge grows substantially when you want to do multiple tasks."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## asyncpg (postgres) preformance:\n",
"<img src=\"asyncpg%20performance.png\" width=\"500\" height=\"400\">\n",
"\n",
"[Source](https://gistpreview.github.io/?b8eac294ac85da177ff82f784ff2cb60)\n",
"\n",
"Postgres is another common tool, and the [`asyncpg`](https://github.com/MagicStack/asyncpg) is insanely fast at working with it- not to mention the other benefits of async programming."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Great, what is it?\n",
"\n",
"Asyncio is an async concurrency framework- running multiple tasks in paralell, in an overlapping way. To put it to a metaphor:\n",
"> Chess master Judit Polgár hosts a chess exhibition in which she plays multiple amateur players. She has two ways of conducting the exhibition: synchronously and asynchronously.\n",
">\n",
"> Assumptions:\n",
">\n",
"> 24 opponents\n",
"Judit makes each chess move in 5 seconds\n",
"Opponents each take 55 seconds to make a move\n",
"Games average 30 pair-moves (60 moves total)\n",
"Synchronous version: Judit plays one game at a time, never two at the same time, until the game is complete. Each game takes (55 + 5) * 30 == 1800 seconds, or 30 minutes. The entire exhibition takes 24 * 30 == 720 minutes, or 12 hours.\n",
">\n",
"> Asynchronous version: Judit moves from table to table, making one move at each table. She leaves the table and lets the opponent make their next move during the wait time. One move on all 24 games takes Judit 24 * 5 == 120 seconds, or 2 minutes. The entire exhibition is now cut down to 120 * 30 == 3600 seconds, or just 1 hour.\n",
"\n",
"[Source](https://www.youtube.com/watch?v=iG6fr81xHKA&t=269s)\n",
"\n",
"To extend this metaphor, a truly parallel version would have a clone of Judit per table, which would theoretically drag the time to complete the tournament down to 30 minutes! However: true parallelism is exceptionally hard- in fact, Python *Doesn't actually have real threading.*"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Threading isn't real\n",
"Python threading isn't \"true\" threading, due to the GIL- the global interpreter lock. \n",
"The GIL prevents any two lines of Python code (strictly speaking, python bytecode) from executing at the same time- this is done due to the way Python stores references to objects in memory, and prevents some nasty race conditions and helps thread saftey. \n",
"\n",
"The downside of this is Python code cannot utilize the full ability of a processor- but it does make it faster in single-core preformance, generally. \n",
"\n",
"Non-Python code, such as a C code like much of the stdlib is implemented in and many third-party libaries like numpy, tensorflow, and pillow, have parts that do *not* trigger the GIL, and can run while the GIL is doing \"other\" things. Note that C code eventually has to return to Python code.\n",
"\n",
"Additionally, spinning up threads and context-switches aren't free either."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### A note on multiprocessing\n",
"When multiprocessing, every subprocess has it's own GIL, and allows for true concurrency, at the cost of being insufferable to use, particularily on Windows. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# or, in image form:\n",
"<img src=\"concurrency blocks.png\" width=\"500\" height=\"400\">\n",
"\n",
"Any overlapping blocks represent C code running. Python bytecode will *never* overlap. Additionally, the thread scheduler is much more regular than the async event loop, so this image is a bit missleading."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Great, how do we use it?"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:19.126770Z",
"start_time": "2021-03-09T05:24:19.124769Z"
}
},
"outputs": [],
"source": [
"async def func():\n",
" await asyncio.sleep(5)\n",
" return 'a'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`async def` indicates that this function is a coroutine (or, async generator). \n",
"\n",
"`await` does two things- it tells the async event loop to context switch to another task until the coroutine it's `await`ing has completed. When it does, get the return value and continue. \n",
"\n",
"`async with` is less common, but this is an asyncronous context manager- used to handle cleanup of something, usually. \n",
"\n",
"`async for` is for iterating over an asyncronous generator.\n",
"\n",
"`return` does exactly as you'd think."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When we just call the coroutine function `func()` we see that it returns a coroutine:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:19.202283Z",
"start_time": "2021-03-09T05:24:19.153274Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"<coroutine object func at 0x0000021007713940>"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"func() "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"By `await`ing the returned coroutine, we actually schedule the coroutine to happen and run it- it then waits for a return value."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:24.616566Z",
"start_time": "2021-03-09T05:24:19.612348Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"'a'"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"await func()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You cannot use the `await`, `async with` and `async for` keywords outside of a coroutine function:"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:24.648072Z",
"start_time": "2021-03-09T05:24:24.644572Z"
}
},
"outputs": [
{
"ename": "SyntaxError",
"evalue": "'await' outside async function (<ipython-input-13-3c72ee9da339>, line 2)",
"output_type": "error",
"traceback": [
"\u001b[1;36m File \u001b[1;32m\"<ipython-input-13-3c72ee9da339>\"\u001b[1;36m, line \u001b[1;32m2\u001b[0m\n\u001b[1;33m await asyncio.sleep(1)\u001b[0m\n\u001b[1;37m ^\u001b[0m\n\u001b[1;31mSyntaxError\u001b[0m\u001b[1;31m:\u001b[0m 'await' outside async function\n"
]
}
],
"source": [
"def x():\n",
" await asyncio.sleep(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A note on jupyter:\n",
"Because we're in a jupyter notebook, we can use `await`, `async with` and `async for` as much as we please, because jupyter (or rather, `ipython`) is already running an async loop! In a normal python enviroment, you can't do this, but must instead have a start point as defined in [Entry points.](#Entry-points)\n",
"\n",
"Several other common Python enviroments also create async loops such as Conda and Spyder- it can be useful to use [`nest_asyncio`](https://github.com/erdewit/nest_asyncio) in such situations."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Definition: Coroutine, coroutine function\n",
"Like a regular function, but cooler. <br>\n",
"Coroutines are a generalized form of a subroutine, and can be entered and exited at multiple points. <br>\n",
"A coroutine function is a function that returns a coroutine, and may contain `await`, `async for` and `async with` keywords.<br>\n",
"**Key point: coroutine functions are defined with `async def`**<br>\n",
"Making a function a coroutine does not make the code inside it non-blocking (defined later).<br>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Definition: awaitable\n",
"\n",
"An `awaitable` is any object that can be `await`ed. These include, most commonly, `coroutines` and `tasks`.\n",
"Tasks are wrapped coroutines that are scheduled to happen immediately. More on these later."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Definition: blocking\n",
"\n",
"Blocking is any code that takes more than .1 seconds to return or yield back to the event loop. This prevents asyncronous programs from doing anything else at the same time- defined further in [blocking](#Blocking)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Entry points\n",
"All asyncronous programs need an \"entry point\" of sorts. Because we're in a jupyter notebook, we can `await` natively, as there is an async loop running in the background. However, in a normal python file we need to create this ourselves, such as with the following example.\n",
"\n",
"```py\n",
"import asyncio\n",
"\n",
"loop = asyncio.get_event_loop()\n",
"\n",
"async def entry_point()\n",
" await some_coroutine()\n",
" \n",
"loop.run_until_complete(entry_point())\n",
"```\n",
"\n",
"`asyncio.get_event_loop()` retreives the asyncio event loop if it exists, and if it doesn't, create it. \n",
"`loop.run_until_complete()` runs the coroutine passed and blocks until it has completed. It cannot be used if the loop is already running. \n",
"\n",
"If you are using python 3.7+, you can replace this with [`asyncio.run()`](https://docs.python.org/3/library/asyncio-task.html#asyncio.run)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### A note on the event loop\n",
"The event loop is the scheduling queue asyncio runs on- when you run a coroutine, you schedule it to happen at some point in the future, and the asyncio event loop runs it at some point. Broadly speaking, you don't need to worry about it too much.\n",
"\n",
"The only thing you need to really keep in mind is that a coroutine from one event loop can't be used in another event loop- such as when dealing with threads.\n",
"\n",
"Additionally, there are multiple implementations of event loops- some event loops can cover multiple threads, and some that are [significantly faster, like uvloop](https://github.com/MagicStack/uvloop)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running multiple coroutines\n",
"\n",
"So, asyncio allows us to run multiple things at the same time: but how do we do that?\n",
"### Creating tasks\n",
"The first method is using [`asyncio.create_task()`](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task). This method takes in a coroutine, and immediately schedules it to happen, and immediately returns a [`task`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task) object. You can `await` the resulting `task` for a return value if you so desired- or you could call `task.cancel()` and stop the task from running. \n",
"\n",
"Tasks are commonly used to create a process running in the background- I want something to happen, but I don't care too much about the result of it. \n",
"\n",
"For example:"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:30.443448Z",
"start_time": "2021-03-09T05:24:30.433946Z"
}
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "894eefc1ba444f2eb035c249e363fa4a",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Output(layout=Layout(border='1px solid black'))"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"out = widgets.Output(layout={'border': '1px solid black'})\n",
"display(out)\n",
"\n",
"# printing inside tasks is a little funny in jupyter, so we have to use this widget dance\n",
"\n",
"async def my_task():\n",
" await asyncio.sleep(3)\n",
" with out:\n",
" print('later')\n",
" w.value += 'later\\n'\n",
"\n",
"asyncio.create_task(my_task())\n",
"with out:\n",
" print('now')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Despite \"my_task()\" having been created and scheduled *before* we print `now`, `later` appears secondary. This is because we've scheduled it to happen \"in the background\" and continue with our program.\n",
"\n",
"If we were to put a loop in our task, it could potentally run forever- useful for doing things repeatedly in the background.\n",
"```py\n",
"async def every_ten_seconds():\n",
" while True:\n",
" await asyncio.sleep(10)\n",
" await something()\n",
" \n",
"task = asyncio.create_task(every_ten_seconds())\n",
"```\n",
"\n",
"We could then cancel this task by calling `task.cancel()`\n",
"\n",
"#### task exceptions\n",
"It's worth noting that errors in a task will raise somewhat silently- until we get the return value. Consider the following code:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:34.400795Z",
"start_time": "2021-03-09T05:24:34.396295Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"<Task pending name='Task-87' coro=<raise_error() running at <ipython-input-15-b18badc984ba>:1>>"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"async def raise_error():\n",
" raise Exception()\n",
"\n",
"asyncio.create_task(raise_error())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This will silently error in the background. We can utilize `task.add_done_callback()` if we want it to visibly error, as shown here, or we could await the result of the task.\n",
"```py\n",
"def exception_catching_callback(task):\n",
" if task.exception():\n",
" task.print_stack()\n",
"\n",
"my_task = loop.create_task(raise_error())\n",
"my_task.add_done_callback(exception_catching_callback)\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### asyncio.gather()\n",
"The second common thing we might want to do is run multiple coroutines at once *and* get the result. We can do that with [`asyncio.gather().`](https://docs.python.org/3/library/asyncio-task.html#running-tasks-concurrently) \n",
"\n",
"`gather` takes in n-many coroutines, runs them all at the same-ish time, waits for them all to complete, and returns all the values at once.\n",
"\n",
"This was used prior in the examples of using web requests."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:47.239874Z",
"start_time": "2021-03-09T05:24:45.248308Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"['s', 'o', 'm', 'e', ' ', 'w', 'o', 'r', 'd', 's']"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"async def my_coroutine(args):\n",
" await asyncio.sleep(2)\n",
" return args\n",
"\n",
"results = await asyncio.gather(*[my_coroutine(x) for x in 'some words'])\n",
"results"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can use this to make an extremely silly time-based sorting function:"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:56.279122Z",
"start_time": "2021-03-09T05:24:47.271878Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"[1, 2, 3, 3, 5, 5, 6, 7, 9]"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"unsorted = [5, 7, 3, 5, 1, 2, 3, 6, 9]\n",
"sorted_ = []\n",
"\n",
"async def sorting_key(arg):\n",
" await asyncio.sleep(arg)\n",
" sorted_.append(arg)\n",
"\n",
"await asyncio.gather(*[sorting_key(x) for x in unsorted])\n",
"sorted_"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the *order* of tasks inside gather is not guranteed, they will all happen \"together\"."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Other methods:\n",
"Left out for brevity are [`asyncio.wait_for`](https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for) and [`asyncio.wait`](https://docs.python.org/3/library/asyncio-task.html#waiting-primitives) which allow you to schedule a timeout for a coroutine, and run multiple coroutines at once, but only return the first to complete among other things."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Synchronization\n",
"Asyncio has 4 [Synchronization Primitives](https://docs.python.org/3/library/asyncio-sync.html). We will be covering 3- `Lock`, `Event` and `Semaphore`. \n",
"### Locks\n",
"[Lock](https://docs.python.org/3/library/asyncio-sync.html#lock)s are the most basic of sync methods- they allow for exclusive access to a resource, to ensure that only one coroutine is doing *something* with *something*. This might be done with say, a file or a database.\n",
"\n",
"Basic usage is exampled thusly:\n",
"\n",
"```py\n",
"lock = asyncio.Lock()\n",
"\n",
"async def do_db_op():\n",
" async with lock():\n",
" database.execute(something)\n",
" \n",
"await asyncio.gather(do_db_op(), do_db_op())\n",
"```\n",
"In this example, `database` can only have one thing calling execute at once. So, we make a lock and have `do_db_op` accquire the lock before contiuning. One task we create will get the lock, do it's operation, and the second will wait it's turn until the first task has completed, and then do the db op\n",
"### Events\n",
"[Events](https://docs.python.org/3/library/asyncio-sync.html#event) are very similar to locks, but they allow *multiple* tasks access at once. For example, we might want to set up a database connection before we allow some coroutines to run, or we might wait for authentication to complete before continuing. Another example might be waiting for a cache to populate from a database, while still allowing some tasks to continue.\n",
"\n",
"An event is defined as `asyncio.Event()` and has a handful of important methods. <br>\n",
"*coroutine* `wait()` waits for the event to be `set`. A `set` event is \"released\" and will allow the code to continue. <br>\n",
"`set()` sets the event, which allows all tasks waiting for the event to continue. <br>\n",
"`clear()` \"locks\" the event, and will cause all code to wait to continue.<br>\n",
"`is_set()` returns the state of the `Event`\n",
"\n",
"In the following example, note that the two running instances of `my_task()` will not print `\"hello <number>\"` until we run the cell after it- which releases the event."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:58.145725Z",
"start_time": "2021-03-09T05:24:58.136723Z"
}
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "f65cda73274c47c6be1801d84a98b806",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Output(layout=Layout(border='1px solid black'))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"<Task pending name='Task-110' coro=<my_task() running at <ipython-input-18-516bff1bf6f4>:5>>"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"out = widgets.Output(layout={'border': '1px solid black'})\n",
"display(out)\n",
"event = asyncio.Event()\n",
"\n",
"async def my_task(number):\n",
" with out:\n",
" print('waiting!', number)\n",
" await event.wait()\n",
" with out:\n",
" print('hello!', number)\n",
"\n",
"asyncio.create_task(my_task(1))\n",
"asyncio.create_task(my_task(2))"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:24:58.790320Z",
"start_time": "2021-03-09T05:24:58.787320Z"
}
},
"outputs": [],
"source": [
"event.set()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Semaphores\n",
"[`Semaphores`](https://docs.python.org/3/library/asyncio-sync.html#semaphore) are a type of `lock` but they allow a specific number of things access to a resource. For example, if you have some API that allows up to 10 requests to be active at once, you could use this to bound it. A `sempahore` has a default value of 1- which makes it functionally identical to a lock by default. Only by raising the value can we use it more flexibily.\n",
"\n",
"In the following example, we create 6 tasks using a similar method as before, but bind them to a semaphore instead of event that allows 2 things to work at once. Thus, tasks will be completed in 2-size chunks."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:25:10.968902Z",
"start_time": "2021-03-09T05:25:04.959600Z"
}
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "fd80d13aa02d4273a5f13e78b1b342d8",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Output(layout=Layout(border='1px solid black'))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"[None, None, None, None, None, None]"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"out = widgets.Output(layout={'border': '1px solid black'})\n",
"display(out)\n",
"sem = asyncio.Semaphore(2)\n",
"\n",
"async def my_task(number):\n",
" with out:\n",
" print('waiting!', number)\n",
" async with sem:\n",
" await asyncio.sleep(2)\n",
" with out:\n",
" print('hello!', number)\n",
"\n",
"await asyncio.gather(*[my_task(n) for n in range(6)])"
]
},
{
"cell_type": "markdown",
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T00:38:56.362914Z",
"start_time": "2021-03-09T00:38:56.360912Z"
}
},
"source": [
"# Queues\n",
"Finally we'll touch on [`queues`](https://docs.python.org/3/library/asyncio-queue.html#queue) a bit. If you're familiar with threading queues, these operate in much the same fashion. Put things in on one end, and things pop out on another end (by default- there are more exotic queues out there).\n",
"\n",
"Queues are used to... queue things. You might use one to create a playlist of songs, for example. Other tasks can add to the queue, and you would have one task that operates on the queue.\n",
"\n",
"A queue can have a limit as well- to cap the number of items. Depending on methods used, it may raise or wait when you try to add to it. \n",
"\n",
"Selected methods:\n",
"\n",
"| function | Explanation |\n",
"| :-- | :-- |\n",
"| *coroutine* `get()` | This method waits until an item is in the queue, and returns it |\n",
"| `get_nowait()` | This method immediately gets an item, and returns it. If there is nothing, it raises |\n",
"| *coroutine* `join()` | This coroutine waits until the queue has been emptied |\n",
"| *coroutine* `put()` | This coroutine adds an item to the queue, and if the queue is full, waits until it can |\n",
"| `put_nowait()` | This method adds an item to a queue without waiting. It will raise if the queue is full. | \n",
"| `task_done()` | This method should be called after you are finished with whatever you received with `get()` |\n",
"\n",
"Notice how some of these methods aren't coroutines- this forms a convenient way to interface between syncronous and asyncronous sections of your code."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the following example, we define a queue and a worker task. The worker task will wait for something to appear in the queue, and when it has, it will \"work\" on it- in this case, sleeping. \n",
"\n",
"The cells following it create buttons to add to the queue- note how `put_nowait()` is not a coroutine, and does not need to be awaited. Observe the different behaviors when the queue's max size is reached between the two options. Also note that because `put_nowait()` is a sync function, it's very easy to add as a callback to our button- while the coroutine `put` was more trouble than it was worth at the time of writing."
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:25:11.008908Z",
"start_time": "2021-03-09T05:25:11.000407Z"
}
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "21ded978ce8d4225af9ded679fab8272",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Output(layout=Layout(border='1px solid black'))"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"out = widgets.Output(layout={'border': '1px solid black'})\n",
"display(out)\n",
"queue = asyncio.Queue(maxsize=10)\n",
"\n",
"async def sleepy_worker():\n",
" while True:\n",
" sleep_for = await queue.get()\n",
" \n",
" await asyncio.sleep(sleep_for)\n",
" \n",
" with out:\n",
" print('Slept for', sleep_for)\n",
" queue.task_done()\n",
" \n",
"worker_task = asyncio.create_task(sleepy_worker())"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:25:11.144932Z",
"start_time": "2021-03-09T05:25:11.044415Z"
}
},
"outputs": [],
"source": [
"await queue.put(3)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:25:12.296161Z",
"start_time": "2021-03-09T05:25:12.287660Z"
}
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "966dc3675f8443fa8781c320f25a7d56",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Button(description='Click me', style=ButtonStyle())"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"button = widgets.Button(description='Click me')\n",
"button.on_click(lambda _ : queue.put_nowait(2))\n",
"display(button)"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:25:14.153408Z",
"start_time": "2021-03-09T05:25:14.150907Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"worker_task.cancel()\n",
"# As this task works \"forever\", we should close it!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Theads\n",
"\n",
"Full usage of threads and asyncio falls a bit outside of the scope of this- for more detailed information, you can read [this section](https://docs.python.org/3/library/asyncio-dev.html#concurrency-and-multithreading) of the documentation. \n",
"\n",
"However, sometimes we need to call blocking code, or just thread something and get the result. Asyncio provides a very tidy interface to do this in the form of [`run_in_executor()`.](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) \n",
"\n",
"An executor, broadly speaking, is a class that runs code elsewhere- there are two included with asyncio, `ThreadPoolExecutor` and `ProcessPoolExecutor` which run a function in a thread and a subprocess respectively. The standard rules about threading and multiprocessing still apply.\n",
"\n",
"The first argument passed is the Executor, which is the default `ThreadPoolExecutor` executor if `None` is passed.\n",
"### Thread saftey\n",
"The vast majority of async-related methods and objects are *not threadsafe*.\n",
"\n",
"Basic example:"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:28:05.399631Z",
"start_time": "2021-03-09T05:28:02.389132Z"
}
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "286ee06e0e164189be06b865dc55cff6",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Output(layout=Layout(border='1px solid black'))"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import time\n",
"out = widgets.Output(layout={'border': '1px solid black'})\n",
"display(out)\n",
"\n",
"def blocking_task(time_):\n",
" time.sleep(time_)\n",
" with out:\n",
" print('hello!')\n",
" \n",
"async def non_blocking_task():\n",
" await asyncio.sleep(1)\n",
" with out:\n",
" print('aloha!')\n",
"\n",
"loop = asyncio.get_event_loop()\n",
"asyncio.create_task(non_blocking_task())\n",
"await loop.run_in_executor(None, blocking_task, 3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice how the non-blocking task was still able to run- this is detailed more in the next section.\n",
"\n",
"If you needed to pass kwargs to the function being executed, you can utilize [`functools.partial`](https://docs.python.org/3/library/functools.html#functools.partial) like such:"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:28:39.462083Z",
"start_time": "2021-03-09T05:28:36.457572Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"hello!\n"
]
}
],
"source": [
"import functools\n",
"def blocking_task(time_, *, word):\n",
" time.sleep(time_)\n",
" print(word)\n",
"\n",
"partial = functools.partial(blocking_task, 3, word='hello!')\n",
" \n",
"await loop.run_in_executor(None, partial)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Blocking\n",
"Blocking is the big bad nasty you can get into with async programming. \n",
"\n",
"**Any function that does not return within 100ms or yield back to the event loop is *blocking***\n",
"\n",
"What does this mean? Why does it matter? Asyncio, being a concurrency framework on a single thread relies on context switches to achieve concurrency- and it can only switch contexts on an await. If you *prevent* it from doing so, nothing else can work at that time. \n",
"\n",
"In the following example, we create a loop that every second posts a message. We then sleep for ten seconds using the *blocking* time.sleep instead of the *non blocking* asyncio.sleep, and we can observe as the task is unable to accomplish it's work. "
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:28:53.896633Z",
"start_time": "2021-03-09T05:28:40.889487Z"
}
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "f4a0486cc4474dc6b3882548eb66c3d5",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Output(layout=Layout(border='1px solid black'))"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"out = widgets.Output(layout={'border': '1px solid black'})\n",
"display(out)\n",
"\n",
"import datetime\n",
"\n",
"async def print_over_time():\n",
" for x in range(10):\n",
" with out:\n",
" print('hello', x)\n",
" await asyncio.sleep(1)\n",
"\n",
"asyncio.create_task(print_over_time())\n",
"\n",
"await asyncio.sleep(3)\n",
"with out:\n",
" print('starting to block!', datetime.datetime.now().strftime('%H:%M:%S'))\n",
" time.sleep(10)\n",
" print('finished blocking!', datetime.datetime.now().strftime('%H:%M:%S'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Common blocking operations include, but are not limited to:\n",
"- `time.sleep()` \n",
"- Heavy math- `sum(i * i for i in range(10 ** 7))` for example\n",
"- blocking IO libraries such as `requests`, `urllib`, `psycopg2`\n",
"- Obscured heavy math such as `PIL` \n",
"- Loading extremely large files\n",
"- Loops that do not yield back at some point\n",
"\n",
"Much of the time you alleviate blocking code- for example, if we need to do heavy C-lib'd math (like `PIL`, `numpy`, `tensorflow`), we can use `run_in_executor` to run it in another thread, and wait for a returned response. Other times, if it's *pure python* blocking code that cannot be otherwise amerliorated, you should use the `ProcessPoolExecutor` with `run_in_executor`\n",
"\n",
"Other times, such as with a loop that does not yield back to the event loop, you can force a yield by dropping an `await asyncio.sleep(0)` which allows for a context switch.\n",
"\n",
"**When avaliable, an asyncronous library should always be used over a syncronous one in an executor.** \n",
"\n",
"**It is imparative to avoid blocking in an asyncronous program!**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Debug Mode\n",
"Asyncio has a [debug mode](https://docs.python.org/3/library/asyncio-dev.html#debug-mode) that amung other things, warns you when things are blocking, when you failed to await a coroutine, or when you call something non-threadsafe across threads."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Making web requests\n",
"\n",
"\n",
"## aiohttp\n",
"[aiohttp](https://docs.aiohttp.org/en/stable/) is the most mature and well-documented web libary for async enviroments- it's very similar to `requests` and mirrors it's API in many places. It, unlike requests, has webserver websocket modules included, and is implemented in C.\n",
"\n",
"In terms of general usage, the primary difference you'll experience with it and requests is that you must explicitly create an [`aiohttp.Clientsession()`](https://docs.aiohttp.org/en/stable/client_advanced.html#client-session) before doing any requests- unlike `requests` which has `requests.get` there is no `aiohttp.get`.\n",
"\n",
"Code roughly equivilant to `requests.get()` is as follows:"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:28:53.971146Z",
"start_time": "2021-03-09T05:28:53.934640Z"
}
},
"outputs": [],
"source": [
"async with aiohttp.ClientSession() as session:\n",
" async with session.get('http://example.org') as response:\n",
" content = await response.read()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This works great for single calls, but if we're doing many calls it's a good idea to create a `ClientSession` and reuse it- as exampled below.\n",
"\n",
"Note that we must close the clientsession when complete, or it will throw a warning. Additionally, if a clientsession is created *outside* of a coroutine, it will also throw a warning. Bluntly, neither of these warnings matter too much as long as they happen only once in the lifespan of a program."
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:28:54.479247Z",
"start_time": "2021-03-09T05:28:54.006654Z"
}
},
"outputs": [],
"source": [
"session = aiohttp.ClientSession()\n",
"\n",
"async with session.get('http://example.org') as response:\n",
" pass\n",
"async with session.get('http://google.com') as response:\n",
" pass\n",
"\n",
"await session.close()"
]
},
{
"cell_type": "markdown",
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T04:52:01.708697Z",
"start_time": "2021-03-09T04:52:01.682692Z"
}
},
"source": [
"Additionally, when you make a request with aiohttp, only the response headers are initially retreived. Only when you `await` the various content-reading methods will it begin to load content. \n",
"\n",
"### Further readings\n",
"Aiohttp cannot be totally covered in this document, but it's rather extensive documentation for the client can be found [here,](https://docs.aiohttp.org/en/stable/client.html#client) as well as the documentation for it's other modules such as the webserver and websocket client.\n",
"\n",
"## httpx\n",
"[httpx](https://www.python-httpx.org/) is a still fairly immature python module, and I would not mention it barring the fact that it supports `http/2`, unlike aiohttp which does not."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Practical Example"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Vestigial example\n",
"\n",
"The following code snippet creates an API server and requests something from it- I decided not to flesh this out further, but it's interesting so I'm leaving it here."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:18:36.954745Z",
"start_time": "2021-03-09T05:18:36.943244Z"
},
"scrolled": true
},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "c8614ae79ca14b4782faf8fec2ef1ef0",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Button(description='Stop Server', style=ButtonStyle())"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"from aiohttp import web\n",
"\n",
"app = web.Application()\n",
"routes = web.RouteTableDef()\n",
"\n",
"@routes.get('/')\n",
"async def root_get_handler(requests):\n",
" return web.json_response({'message':'hello'})\n",
"\n",
"app.add_routes(routes)\n",
"\n",
"web_task = asyncio.create_task(web._run_app(app)) \n",
"# normally you use web.run_app but we need a non-blocking version\n",
"\n",
"def shutdown_server(button):\n",
" web_task.cancel()\n",
" button.description='Server stopped'\n",
" button.disabled=True\n",
" \n",
" \n",
"close_button = widgets.Button(description='Stop Server')\n",
"close_button.on_click(lambda _:shutdown_server(close_button))\n",
"display(close_button)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"ExecuteTime": {
"end_time": "2021-03-09T05:17:25.951762Z",
"start_time": "2021-03-09T05:17:23.944411Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"b'{\"message\": \"hello\"}'\n"
]
}
],
"source": [
"async with aiohttp.ClientSession() as session:\n",
" async with session.get('http://localhost:8080/') as response:\n",
" content = await response.read()\n",
" print(content)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment