I've recently been looking into the go concurrency model to see how it compares to asyncio.
An interesting concept caught my attention: go generators.
It roughly looks like this:
func count(msg string) <-chan string {
c := make(chan int)
go func() {
for i := 0; ; i++ {
c <- i
time.Sleep(1 * time.Second)
}
}()
return c
It's a function that spawns a goroutine and returns a channel to retreive the data produced by the goroutine.
Channels are by default unbuffered, which means producer and consumer will wait for each other to send and receive data, respectively. That's basically a handshake.
Asynchronous generators in python look very similar:
async def count():
for x in itertools.count():
yield x
await asyncio.sleep(1)
It's created the same way, and it also has the consumer waiting for the producer to yield a value.
However, there is a big difference: the asynchronous generator is not running in its own task: the producing operation is explicitely requested by the consumer through asynchronous iteration.
In order to get the same behavior in asyncio, we have to spawn a task to iterate over the generator and put the data in a channel. The consumer can then iterate asynchronously over the channel.
This and the handshake logic can be abstracted in a wrapper / decorator. The implementation is not so complicated, though it's not trivial either: aiogo.py
As an example, consider the following code:
async def produce(n):
for x in range(n):
await asyncio.sleep(STEP)
yield x
async def process(agen):
async for x in agen:
await asyncio.sleep(2*STEP)
yield x
async def main():
ait = process(produce(3))
async for x in ait:
print(x)
We're sending 3 items (1, 2 and 3) through a pipeline of two blocks:
- the item is produced, takes 1 step: A0
- the item is processed, takes 2 steps: B0 and B1
Now, here's what happens if regular async generators are used:
ITEMS : 0 1 2
STEP 0: A0
STEP 1: B0
STEP 2: B1
STEP 3: A0
STEP 4: B0
STEP 5: B1
STEP 6: A0
STEP 7: B0
STEP 8: B1
Everything looks synchronous. Now let's decorate the async generators to turn them into go-style generators:
@gogenerator
async def produce(n):
for x in range(n):
await asyncio.sleep(STEP)
yield x
@gogenerator
async def process(gogen):
async with gogen as chan:
async for x in chan:
await asyncio.sleep(2*STEP)
yield x
async def main():
gogen = process(produce(3))
async with gogen as chan:
async for x in chan:
print(x)
Note that the async context managers are optional, but it makes the task creation / termination more explicit.
Now that's what we get (WH
means waiting for the handshake):
ITEMS : 0 1 2
STEP 0: A0
STEP 1: B0 A0
STEP 2: B1 WH
STEP 3: B0 A0
STEP 4: B1 WH
STEP 5: B0
STEP 6: B1
Here we get some actual concurrency, and 2 steps have been saved.
Go-style generators have a nice property: they anticipate the producing operation but blocks until the data is actually transmitted, preventing a congestion if the consumer is too slow.
As far as I understand, this is called reactive pull backpressure in the reactive world.