Last active
August 24, 2020 02:27
-
-
Save A5rocks/1e6f27ea98b11676cc70c2c3ab7aa187 to your computer and use it in GitHub Desktop.
Dispatch with Trio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pretty basic backpressured pubsub | |
import trio | |
import typing | |
class PubSub: | |
topics_to_channels: typing.MutableMapping[ | |
str, | |
typing.List[trio.MemorySendChannel] | |
] | |
def __init__(self) -> None: | |
self.topics_to_channels = {} | |
def _get_topic(self, topic: str) -> typing.List[trio.MemorySendChannel]: | |
return self.topics_to_channels.get(topic, []) | |
def subscribe(self, topic: str) -> trio.MemoryReceiveChannel: | |
if self.topics_to_channels.get(topic) is None: | |
self.topics_to_channels[topic] = [] | |
send, recv = trio.open_memory_channel(0) | |
self.topics_to_channels[topic].append(send) | |
return recv | |
async def publish(self, topic: str, message: typing.Any) -> float: | |
chans = self._get_topic(topic) | |
res = 0.0 | |
for chan in chans: | |
try: | |
chan.send_nowait(message) | |
await trio.lowlevel.checkpoint() | |
except trio.WouldBlock: | |
await trio.lowlevel.checkpoint() | |
# the backpressure thing is full | |
res += 1 / len(chans) | |
except trio.ClosedResourceError: | |
await trio.lowlevel.checkpoint() | |
# wtf why are we trying to send to a channel that is closed | |
self.topics_to_channels[topic].remove(chan) | |
except trio.BrokenResourceError: | |
# there's no receive streams! | |
await chan.aclose() | |
self.topics_to_channels[topic].remove(chan) | |
# we return the "backpressure rating":tm::tm: | |
return res | |
async def consumer(topic: str, identity: int, pubsub: PubSub) -> None: | |
channel = pubsub.subscribe(topic) | |
async for message in channel: | |
print(f'got a new message in consumer #{identity}! {message!r}') | |
async def main() -> None: | |
async with trio.open_nursery() as nursery: | |
pubsub = PubSub() | |
for i in range(5): | |
nursery.start_soon(consumer, 'topic_thing', i, pubsub) | |
# let's let the consumers subscribe | |
await trio.sleep(1) | |
await pubsub.publish('topic_thing', 'Hello, world!') | |
trio.run(main) |
I really underestimated async's overhead. Here's the results for 2 consumers and pypy
with await trio.lowlevel.checkpoint()
in PubSub.publish
commented out:
13.19s user 0.08s system 91% cpu 14.496 total
12.81s user 0.05s system 92% cpu 13.891 total
12.50s user 0.06s system 92% cpu 13.581 total
12.57s user 0.07s system 92% cpu 13.672 total
12.53s user 0.06s system 92% cpu 13.614 total
12.71s user 0.06s system 92% cpu 13.796 total
12.47s user 0.05s system 92% cpu 13.541 total
12.38s user 0.07s system 92% cpu 13.475 total
12.42s user 0.06s system 92% cpu 13.505 total
12.63s user 0.02s system 92% cpu 13.674 total
And while this makes for a not trio-nic (?) API, we can move the closing of the writing channel into a desubscribe
call and then allow BrokenResourceException
to propagate.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Benchmarking
I used the following code:
And ran with some example numbers of consumers 10 times each. Note that I didn't do anything to prepare for the benchmarks so the script was switching from core to core occasionally, but eh, whatever. I just wanted to have rough numbers.
With 0 consumers and
pypy
:With 1 consumer and
pypy
:With 2 consumers and
pypy
:With 0 consumers and
cpython
:With 1 consumer and
cpython
:With 2 consumers and
cpython
:pypy
version:cpython
version:TL;DR cpython is actually pretty great! Though this isn't the long-term system pypy is built for, so I will still try to support it :P