Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Last active July 26, 2023 06:50
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vxgmichel/4ea46d3ae5c270260471d304a2c8e97b to your computer and use it in GitHub Desktop.
Save vxgmichel/4ea46d3ae5c270260471d304a2c8e97b to your computer and use it in GitHub Desktop.
Go-style generators in asyncio

Go-style generators in asyncio

I've recently been looking into the go concurrency model to see how it compares to asyncio.

An interesting concept caught my attention: go generators.

Generators in Go

It roughly looks like this:

func count(msg string) <-chan string { 
    c := make(chan int)
    go func() {
        for i := 0; ; i++ {
            c <- i
            time.Sleep(1 * time.Second)
        }
    }()
    return c

It's a function that spawns a goroutine and returns a channel to retreive the data produced by the goroutine.

Channels are by default unbuffered, which means producer and consumer will wait for each other to send and receive data, respectively. That's basically a handshake.

Asynchronous generators in asyncio

Asynchronous generators in python look very similar:

async def count():
    for x in itertools.count():
        yield x
        await asyncio.sleep(1)

It's created the same way, and it also has the consumer waiting for the producer to yield a value.

However, there is a big difference: the asynchronous generator is not running in its own task: the producing operation is explicitely requested by the consumer through asynchronous iteration.

In order to get the same behavior in asyncio, we have to spawn a task to iterate over the generator and put the data in a channel. The consumer can then iterate asynchronously over the channel.

This and the handshake logic can be abstracted in a wrapper / decorator. The implementation is not so complicated, though it's not trivial either: aiogo.py

Example

As an example, consider the following code:

async def produce(n):
    for x in range(n):
        await asyncio.sleep(STEP)
        yield x

async def process(agen):
    async for x in agen:
        await asyncio.sleep(2*STEP)
        yield x

async def main():
    ait = process(produce(3))
    async for x in ait:
        print(x)

We're sending 3 items (1, 2 and 3) through a pipeline of two blocks:

  • the item is produced, takes 1 step: A0
  • the item is processed, takes 2 steps: B0 and B1

Now, here's what happens if regular async generators are used:

ITEMS : 0  1  2
STEP 0: A0 
STEP 1: B0 
STEP 2: B1
STEP 3:    A0 
STEP 4:    B0 
STEP 5:    B1 
STEP 6:       A0
STEP 7:       B0
STEP 8:       B1

Everything looks synchronous. Now let's decorate the async generators to turn them into go-style generators:

@gogenerator
async def produce(n):
    for x in range(n):
        await asyncio.sleep(STEP)
        yield x

@gogenerator
async def process(gogen):
    async with gogen as chan:
        async for x in chan:
            await asyncio.sleep(2*STEP)
            yield x

async def main():
    gogen = process(produce(3))
    async with gogen as chan:
        async for x in chan:
            print(x)

Note that the async context managers are optional, but it makes the task creation / termination more explicit.

Now that's what we get (WH means waiting for the handshake):

ITEMS : 0  1  2
STEP 0: A0    
STEP 1: B0 A0 
STEP 2: B1 WH 
STEP 3:    B0 A0
STEP 4:    B1 WH
STEP 5:       B0
STEP 6:       B1

Here we get some actual concurrency, and 2 steps have been saved.

Conclusion

Go-style generators have a nice property: they anticipate the producing operation but blocks until the data is actually transmitted, preventing a congestion if the consumer is too slow.

As far as I understand, this is called reactive pull backpressure in the reactive world.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment