Skip to content

Instantly share code, notes, and snippets.

@A5rocks
Last active August 24, 2020 02:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save A5rocks/1e6f27ea98b11676cc70c2c3ab7aa187 to your computer and use it in GitHub Desktop.
Save A5rocks/1e6f27ea98b11676cc70c2c3ab7aa187 to your computer and use it in GitHub Desktop.
Dispatch with Trio
# pretty basic backpressured pubsub
import trio
import typing
class PubSub:
topics_to_channels: typing.MutableMapping[
str,
typing.List[trio.MemorySendChannel]
]
def __init__(self) -> None:
self.topics_to_channels = {}
def _get_topic(self, topic: str) -> typing.List[trio.MemorySendChannel]:
return self.topics_to_channels.get(topic, [])
def subscribe(self, topic: str) -> trio.MemoryReceiveChannel:
if self.topics_to_channels.get(topic) is None:
self.topics_to_channels[topic] = []
send, recv = trio.open_memory_channel(0)
self.topics_to_channels[topic].append(send)
return recv
async def publish(self, topic: str, message: typing.Any) -> float:
chans = self._get_topic(topic)
res = 0.0
for chan in chans:
try:
chan.send_nowait(message)
await trio.lowlevel.checkpoint()
except trio.WouldBlock:
await trio.lowlevel.checkpoint()
# the backpressure thing is full
res += 1 / len(chans)
except trio.ClosedResourceError:
await trio.lowlevel.checkpoint()
# wtf why are we trying to send to a channel that is closed
self.topics_to_channels[topic].remove(chan)
except trio.BrokenResourceError:
# there's no receive streams!
await chan.aclose()
self.topics_to_channels[topic].remove(chan)
# we return the "backpressure rating":tm::tm:
return res
async def consumer(topic: str, identity: int, pubsub: PubSub) -> None:
channel = pubsub.subscribe(topic)
async for message in channel:
print(f'got a new message in consumer #{identity}! {message!r}')
async def main() -> None:
async with trio.open_nursery() as nursery:
pubsub = PubSub()
for i in range(5):
nursery.start_soon(consumer, 'topic_thing', i, pubsub)
# let's let the consumers subscribe
await trio.sleep(1)
await pubsub.publish('topic_thing', 'Hello, world!')
trio.run(main)
@A5rocks
Copy link
Author

A5rocks commented Aug 24, 2020

Benchmarking

I used the following code:

async def consumer(topic: str, identity: int, pubsub: PubSub) -> None:
    channel = pubsub.subscribe(topic)

    async for message in channel:
        # print(f'got a new message in consumer #{identity}! {message!r}')
        pass

async def main() -> None:
    async with trio.open_nursery() as nursery:
        pubsub = PubSub()

        for i in range(<# of consumers>):
            nursery.start_soon(consumer, 'topic_thing', i, pubsub)

        # let's let the consumers subscribe
        await trio.sleep(1)

        for i in range(100 * 1000):
            bp = await pubsub.publish('topic_thing', 'Hello, world!')
            # let's give the subscribers a chance not to backpressure
            if bp > 0:
                i -= 1
                await trio.lowlevel.checkpoint()
                

        for writer in pubsub._get_topic('topic_thing'):
            await writer.aclose()

trio.run(main)

And ran with some example numbers of consumers 10 times each. Note that I didn't do anything to prepare for the benchmarks so the script was switching from core to core occasionally, but eh, whatever. I just wanted to have rough numbers.

With 0 consumers and pypy:

0.38s user 0.04s system 28% cpu 1.432 total
0.37s user 0.05s system 29% cpu 1.445 total
0.39s user 0.03s system 29% cpu 1.443 total
0.45s user 0.03s system 31% cpu 1.496 total
0.44s user 0.03s system 31% cpu 1.493 total
0.46s user 0.03s system 32% cpu 1.506 total
0.45s user 0.03s system 32% cpu 1.503 total
0.41s user 0.05s system 31% cpu 1.479 total
0.45s user 0.03s system 32% cpu 1.500 total
0.42s user 0.03s system 30% cpu 1.467 total

With 1 consumer and pypy:

16.67s user 0.07s system 94% cpu 17.769 total
16.52s user 0.07s system 94% cpu 17.620 total
16.59s user 0.08s system 94% cpu 17.734 total
16.35s user 0.08s system 94% cpu 17.466 total
16.37s user 0.07s system 94% cpu 17.463 total
16.24s user 0.06s system 94% cpu 17.344 total
16.52s user 0.07s system 94% cpu 17.655 total
16.16s user 0.07s system 94% cpu 17.260 total
15.85s user 0.07s system 93% cpu 16.954 total
17.00s user 0.08s system 94% cpu 18.101 total

With 2 consumers and pypy:

30.92s user 0.08s system 96% cpu 32.051 total
30.15s user 0.10s system 96% cpu 31.305 total
29.73s user 0.11s system 96% cpu 30.873 total
29.77s user 0.14s system 96% cpu 30.964 total
30.14s user 0.07s system 96% cpu 31.249 total
29.19s user 0.08s system 96% cpu 30.302 total
29.60s user 0.06s system 96% cpu 30.694 total
30.19s user 0.10s system 96% cpu 31.322 total
29.52s user 0.09s system 96% cpu 30.644 total
29.51s user 0.08s system 96% cpu 30.654 total

With 0 consumers and cpython:

0.15s user 0.02s system 14% cpu 1.168 total
0.16s user 0.00s system 13% cpu 1.162 total
0.15s user 0.01s system 13% cpu 1.162 total
0.15s user 0.02s system 14% cpu 1.169 total
0.16s user 0.00s system 13% cpu 1.164 total
0.17s user 0.00s system 14% cpu 1.170 total
0.16s user 0.02s system 14% cpu 1.173 total
0.16s user 0.01s system 14% cpu 1.175 total
0.14s user 0.02s system 14% cpu 1.168 total
0.15s user 0.02s system 13% cpu 1.163 total

With 1 consumer and cpython:

9.29s user 0.03s system 90% cpu 10.321 total
9.02s user 0.05s system 90% cpu 10.073 total
9.26s user 0.04s system 90% cpu 10.314 total
9.12s user 0.04s system 90% cpu 10.167 total
9.03s user 0.04s system 90% cpu 10.078 total
9.14s user 0.06s system 90% cpu 10.202 total
9.08s user 0.02s system 90% cpu 10.102 total
9.05s user 0.03s system 90% cpu 10.083 total
8.97s user 0.05s system 89% cpu 10.033 total
9.17s user 0.04s system 90% cpu 10.222 total

With 2 consumers and cpython:

14.97s user 0.02s system 93% cpu 15.998 total
14.82s user 0.04s system 93% cpu 15.873 total
15.08s user 0.06s system 93% cpu 16.147 total
15.16s user 0.07s system 93% cpu 16.232 total
14.90s user 0.07s system 93% cpu 15.977 total
15.09s user 0.04s system 93% cpu 16.139 total
15.17s user 0.04s system 93% cpu 16.224 total
15.13s user 0.05s system 93% cpu 16.186 total
15.03s user 0.04s system 93% cpu 16.087 total
14.92s user 0.05s system 93% cpu 15.968 total

pypy version:

Python 3.6.9 (2ad108f17bdb, Apr 07 2020, 02:59:05)
[PyPy 7.3.1 with GCC 7.3.1 20180303 (Red Hat 7.3.1-5)]

cpython version:

Python 3.8.2 (default, Jun 16 2020, 00:07:49) 
[GCC 8.3.0] on linux

TL;DR cpython is actually pretty great! Though this isn't the long-term system pypy is built for, so I will still try to support it :P

@A5rocks
Copy link
Author

A5rocks commented Aug 24, 2020

I really underestimated async's overhead. Here's the results for 2 consumers and pypy with await trio.lowlevel.checkpoint() in PubSub.publish commented out:

13.19s user 0.08s system 91% cpu 14.496 total
12.81s user 0.05s system 92% cpu 13.891 total
12.50s user 0.06s system 92% cpu 13.581 total
12.57s user 0.07s system 92% cpu 13.672 total
12.53s user 0.06s system 92% cpu 13.614 total
12.71s user 0.06s system 92% cpu 13.796 total
12.47s user 0.05s system 92% cpu 13.541 total
12.38s user 0.07s system 92% cpu 13.475 total
12.42s user 0.06s system 92% cpu 13.505 total
12.63s user 0.02s system 92% cpu 13.674 total

And while this makes for a not trio-nic (?) API, we can move the closing of the writing channel into a desubscribe call and then allow BrokenResourceException to propagate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment