Skip to content

Instantly share code, notes, and snippets.

@AvnerBen

AvnerBen/blog.md Secret

Created October 16, 2021 18:23
Show Gist options
  • Save AvnerBen/7a220ecb0aece823bf1d1d6992931acc to your computer and use it in GitHub Desktop.
Save AvnerBen/7a220ecb0aece823bf1d1d6992931acc to your computer and use it in GitHub Desktop.

Parallel Programming in Python Lesson 6. Cooperative programming - asynchronous

This is the sixth and last in a series of lessons, covering the various facilities that the Python programming language offers for parallel programming, and the motivation for using each of them. In the previous lessons, we have explored the applicative requirements for event-driven design, and have learned to distinguish the ones that indeed require parallel code. We discussed the common producer/consumer use case and explored various implementations of it: multithreading, multiprocessing and cooperative programming. The latter replaces the old-school separation of parallel logic to threads / processes with cooperation of coroutines, interleaving the synchronization-requiring activities within a single thread of control. In this last lesson, we shall consider Python's support for the special (and important) case of leaving the execution of these coroutines (or any function) open to dispatch.

Contents:

  1. Motivation for dispatched coroutines
  2. Producer/consumer via asynchronous queue
  3. Consumer sets the pace
  4. Asynchronous iteration
  5. Additional functionality

TITLE 1. Motivation for dispatched coroutines

Earlier in this series, we discussed the use case of two (or more) programmatic tasks - designated "consumer" and "producer" - that lived in parallel and required synchronization in order to function properly. The one task consumed input that - not incidentally - happened to be the output produced by the other task and vice versa. In the "classical" multithreading/multiprocessing solution (discussed in lessons 3 and 4, respectively) each task is launched on its separate way, plus the overhead of synchronization machinery (locks, events, queues) that must be consulted before access to the common resource on both sides. For example, to make sure that consuming the input does not interfere with producing the output (which in our example, are one and the same object), to make sure that the input is indeed there for consumption, to make sure that the output is indeed delivered after production, etc. And then, the coroutine solution (discussed in lesson 5) manages to keep both tasks in the same thread of control, interleaving the input and output steps, using the coroutine protocol, needing no explicit synchronization. A single thread of control may do, because the consumer and producer tasks (as well as most other cases of tasks in our universe that invite parallelism), involve no truly simultaneous steps. They are never required to perform any two atomic steps at exactly the same time!

But the simplicity of the coroutine solution comes with a price tag. The coroutine solution works in quanta of function calls (or implicit coroutine reentries, which are actually implemented by function call - the magic method __next__, etc). And a function call blocks every thing in the path of control leading to it (unlike a separate thread which may or may not block itself, leaving the other threads running as usual). But functions block their caller until they return, even if they do little or nothing at all (as far as the CPU in concerned) in the mean time. Two obvious examples are functions that do I/O (which involves waiting idly for the hardware) and functions that sleep. In the multithreaded solution, such sluggish functions did not prevent their peers (in the other threads) from going. But now that we have squeezed the entire parallel functionality to a single thread, it seems we shot ourselves in the foot! To conclude the multithreading impersonation, it would be nice if we could somehow persuade the functions that block us unnecessarily to step aside and let us use this time window - but while still remaining in the same thread of control!

The secret of the dispatched-coroutine mechanism is that, at any moment, many function calls may be logically blocked, but this still happens within a single thread of control (although the latter can only physically block one of them!)

The common structured way to achieve this feat is to forward the execution of potentially-idling functions (that may be safely delayed) to a (global) dispatcher - to schedule their execution. Programmatically - to await for their execution (blocking). To make the miracle come true, we may start the program by scheduling several such functions "simultaneously" (in one request to the dispatcher). What we get is a control flow where, while each awaiting function along the way is indeed blocked, the order of their execution is undefined! (Since the dispatcher may, at any time, have accumulated so many await requests and release them at will). Because the program originally scheduled several paths of control, to be selected at random, which may further schedule so many more function calls to select from, etc, each time an awaiting function call is dispatched, the program may resume another logical path of control (that has been practically suspended). The secret of the dispatched-coroutine mechanism is that, at any moment, more than one function calls may be logically blocked (just as in multithreading, where at any moment, more than one thread - i.e. a function call within it - may be blocked), but this still happens within a single thread of control (although the latter can physically block only one function call!). This scenario, indeed, resembles multi-threading (minus simultaneity) so much, that some people are calling such programmatic tasks green threads (the "green" suggesting economy, I guess).

While such a dispatcher mechanism may perhaps be written ad-hoc, using common procedural building blocks, having a language-supported mechanism takes care of many small and not-so-small details, and its globality ensures that all await requests (including those made by third-party code) will indeed be reckoned. In Python, this language facility is called async-IO and involves the keywords asynch def for a schedule-able function (including generator) and await for scheduling such function. Iterator coroutine reentry is supported with asynch for. One may launch the event loop (dispatcher) explicitly (or do with the default). To stress the resemblance to multi-threading, async-IO supports awaitable synchronization objects - lock, queue, event and sleep function. In addition, the buit-in async-IO library comes with a useful networking API.

Note that the term "async" (short for "asynchronous") is used here in a peculiar way. So far in this series, we have been using the term asynchronous function invocation in the sense of non-blocking. The function is launched on a fresh call stack (typically opening another thread or process), leaving the caller free to proceed on the original call stack, resulting in parallelism. The two paths of control proceed to unfold in parallel. (Consider the launching metaphor, as in launching a ship to sea. The ship proceeds to have its own sea voyage. We remain on shore to proceed with our business. While we may be responsible for the ship and the cargo it should return, we are not physically on board the ship). So, the thread the caller has launched is going on its separate way. Whether the caller expects a result (and how to obtain it) is up to the caller. Viewed in this "classic" context, the async-IO facility, contrary with its name, is definitely synchronous. The caller is suspended and will only resume, once the awaited coroutine returns. The difference between this awaited call and the common synchronous call is that the awaited call is deferred in time. It may take unspecified time to start and, in the mean time, all other awaiting coroutines will be called one by one, and (unless you tamper with it) in the same order scheduled. New coroutines scheduled in the mean time, will have to wait (for yours). The resemblance to multi-threading/processing (known as green threads) is virtual and conditional (i.e. in the eyes of the beholder) and will materialize only in the use case of scheduling more than one coroutines in batch at program start and, provided that each of these continues to schedule its critical steps as it unfolds.

In addition, the async-IO may be exploited as a smart way to optimize performance (especially in respect to IO, indeed), and there is a deluge of literature about that. However, such usage, regardless of popularity, is beyond the scope of this series, which is about parallel programming in Python.

Of course, in order to get the benefit of the IO in the async-IO for functionality other than the built-in default, one must resort to a third-party library with awaitable functions (often prefixed by "aio" - e.g. for such functionality as web client, disk access, etc). Needless to say, the default built-in synchronous libraries will, of course, block us for good, which is not what we are looking for! Here, we do not want to really sleep(10) (blocking everything in the thread of control for ten seconds) - we rather wish to await sleep(10), freeing the next scheduled bunch of functions to act in the mean time and return to us after greater/equal ten seconds. (Greater/equal, rather than exactly - because the mechanism is cooperative, as opposed to preemptive, in the case of multithreading).

TITLE: 2. Producer/consumer via asynchronous queue

The following example refactors the Producer/Consumer multi-threaded queue of the respective example in Lesson Three, to an async-IO implementation. The three parallel main functions of this program - producer, consumer and timer - are implemented as three "green threads" that cooperate in the (physical) main thread of control, exchanging data through an asyncronous queue. All actions that involve waiting (get message, put message and suspend) are scheduled, freeing the other party to do its job in the mean time. (Output and footnotes below)

https://gist.github.com/a389513559d599d6d43edad2177ded06

Notes:

  1. The program imports the async-IO library.
  2. The producer is initialized using an async queue.
  3. The producer "green thread" sports a run function, which is async.
  4. The request from the queue to send next message is scheduled (freeing the consumer, if properly scheduled, to pick it up).
  5. The Producer.stop method is awaitable and takes the timeout.
  6. The Producer.stop method schedules its sleep time, freeing the producer and consumer to work in the mean time.
  7. Instead of blocking the whole thread for two seconds, the producer releases whoever it may concern to do their job for two seconds, for example, to let the consumer pick up the input, if not done yet.
  8. The consumer opens an async (awaitable) queue.
  9. The consumer is also a "green" thread.
  10. The request from the queue to get the next message is scheduled (actually, waiting for the producer to put it).
  11. The producer is handed the consumer's queue
  12. The three green threads in this application are awaited asynchronously. (Unlike a single await operation that blocks, here, the three green threads are "gathered" and handed to the event loop to dispatch in random order).
  13. Creating the event loop and making sure it starts and stops correctly is left to the asynch-IO default.

Output:

Round 1
Round 2
Round 3
Round 4
Round 5
Round 6
Round 7
Round 8
Round 9
Round 10
[End of input]

Note that gathering the greeen threads for awaiting is essential. On the contrary, awaiting for each green thread consecutively will indeed do just that: first, the timer will execute immediately and stop us for twenty seconds (because nothing is standing in its way, as yet), then the producer will enter execution and block us indefinitely (there is no consumer yet) and then (theoretically), the consumer should enter execution and block indefinitely (there is no producer anymore). Needless to say, this is hardly the scenario we are expecting.

The sequence diagram below shows the logic behind this design. Interestingly, the collaboration and sequence are precisely the same as in the pull iterator example of Chapter Five (see sequence diagram there). The only differences are aesthetic (from the designer's point of view):

  • The consumer loop is also parallelized. (Used to be the main thread).
  • The producer loop is awaited. (Used to be a thread).
  • The send and receive functions are awaited, implemented by asynchronous queue. (Used to be synchronized by Python's iteration mechanism).

The following implementation subtleties are not depicted in the sequence diagram: Although the order of launching the Consumer and Producer is undefined, the Producer.send will always precede Consumer.receive (because they are both awaitable). Even if Consumer.receive arrives first, it will be blocked until the Producer sends. And if Producer.send arrives first, it then waits for two seconds before exiting, which is more than enough for the Consumer to wake up and pull the message. Of course, a scenario that involves a very short response time (say, in few milliseconds or less), would invite an event (as in the Section Three example , bellow).

TITLE: 3. Consumer sets the pace

In this example, the consumer sets the pace, with the producer waiting by the queue, using the common Python queue protocol. (Output remains the same)

https://gist.github.com/68fcf5c1a7348f1297302e4b2be5d4fd

Notes:

  1. The producer waits for the queue event to signal (freeing the consumer to resume).
  2. The consumer signals the producer to resume. Note that not all of the async queue's methods are asynchronous!

TITLE: 4. Dispatched iteration

Python supports asynchronous pull iterators - async for. (But not asynchronous push iterators).

The following example refactors the multiple Producers / single Consumer example from lesson five, with async-IO implementation. Transferring the Consumer logic from thread to coroutine serves a didactic purpose. It does not improve performance and the order of the output will always be synchronous: "a", "b", "c". The only improvement is the removal of the thread.

https://gist.github.com/9b25ac62f07e2328dd0bd305ca3a2730

Notes:

  1. the magic iteration method is asynchronous and is rather called __aiter__. (It also returns an async_generator rather than iterator).
  2. However inside, the inputs are obtained synchronously, in the obvious order. (This example does not utilize the full power of asynchronous iteration).
  3. The consumer run method must be asyncronous, because it uses async for.
  4. The for loop is asyncronous (using the Producer's aiter).
  5. Producer.stop is a state-changing setter and does not invite synchronization.

Output:

Round a1
Round b1
Round c1
Round a2
Round b2
Round c2
Round a3
Round b3
Round c3
Round a4
Round b4
Round c4
Round a5
Round b5
Round c5
[End of input]

TITLE 5. Additional functionality

There are two ways to schedule a coroutine:

  1. Within an asynchronous function, using Python syntax to await-call it. (Some restrictions: You cannot await from module scope or from a synchronous function. You cannot await a lambda).
  2. At the root of the program (normally), to explictly request the async-IO facility to schedule one or any number of coroutines. The recommended way (Python 3.7+, demonstrated here) is to request the async-IO facility to run the (one) main task where the so many green threads are gathered and awaited in bulk. The old-fashioned way (not demonstrated here) is to create the event loop (or select the default) and then request it to schedule the bunch of green threads, usually to completion (of all pending tasks). Alternatively, you can run the event loop forever and terminate it manually.

To be specific, the explicit scheduling request atually takes a task and returns a future (which will expose the returned value, when ready). Normally, one can ignore this low-level API, because the task will be built automatically when one supplies a coroutine (async function) or generator (function that yields) and the returned value is taken care of silently by the dispatcher. (But of course, you can create the task explicitly, where the default is too restrictive). It is also possible to accumulate scheduled functions one by one before the event loop - ensure_future - (rather than deliver them all wrapped together, as in these examples).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment