Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
import trio
import trio.testing
# A simple protocol where messages are single bytes b"a", b"b", b"c", etc.,
# and each one is acknowledged by echoing it back uppercased. So send b"a",
# get b"A", etc.
# To make it easier to shut down, message b"z" causes the receiver to exit.
async def receiver(stream):
# The receiver is a simple loop that reads messages from the stream, and
# acknowledges each one before reading the next. (Of course in a real
# program it would probably also do something with each message before
# acknowledging it, but this is just an example.)
while True:
print("receiver: waiting for message to arrive")
# Get message
message = await stream.receive_some(1)
# Acknowledge message
reply = message.upper()
print(f"receiver: got {message}, sending {reply}")
await stream.send_all(reply)
if message == b"z":
print("receiver: exiting")
# Now we want to send a whole batch of messages at once, and wait for the
# replies to all of them. There are a few different ways to do this.
# The simplest way is to send the first message, wait for the reply, send the
# next message, wait for the reply, etc. This strategy is simple, and probably
# the best place to start. You can wrap the basic send/receive flow into a
# single function that sends a message and then waits for the reply and
# returns it.
async def send_message_batch_0(stream, messages):
for message in messages:
print(f"send_message_batch_0: sending {message}")
await stream.send_all(message)
print(f"send_message_batch_0: waiting for reply")
reply = await stream.receive_some(1)
print(f"send_message_batch_0: got reply {reply}")
# Another way to do it is to send all the messages in one big batch, and
# *then* wait for all the replies. Sending multiple messages without waiting
# for the replies is called "pipelining", and it can speed things up if you
# have a high-latency link, because it reduces the number of round trips.
# However, this implementation has a subtle bug! It can deadlock if the stream
# runs out of buffer space. This can lead to frustrating intermittent
# problems, because whether you hit it or not depends on the size of the
# batch, the size of the messages, how much buffer the OS decides to allocate
# to this socket connection, the state of intermediate network routers, the
# phase of the moon, etc.
async def send_message_batch_1(stream, messages): # This one is buggy!
for message in messages:
print(f"send_message_batch_1: sending {message}")
await stream.send_all(message)
for i in range(len(messages)):
print(f"send_message_batch_1: waiting for reply")
reply = await stream.receive_some(1)
print(f"send_message_batch_1: got reply {reply}")
# Here's another way to do pipelining: it's the exact same code as the
# function above, but now instead of running the two loops sequentially, we
# run them concurrently in two different tasks. This fixes the bug.
async def send_message_batch_2(stream, messages):
async def send_all():
for message in messages:
print(f"send_message_batch_2: sending {message}")
await stream.send_all(message)
async def wait_for_all_replies():
for i in range(len(messages)):
print(f"send_message_batch_2: waiting for reply")
reply = await stream.receive_some(1)
print(f"send_message_batch_2: got reply {reply}")
async with trio.open_nursery() as nursery:
# Trio's `LockstepStream` class is useful to help test for these kinds of
# subtle flow-control based deadlocks. It simulates the worst-case network
# connection: one that has no buffering whatsoever. So now the bug is easy to
# catch, without having to mock the phase of the moon.
async def test_send_message_batch_impl(send_message_batch_impl):
stream1, stream2 = trio.testing.lockstep_stream_pair()
async with trio.open_nursery() as nursery:
# One task pretends to be the server, running the receiver loop
nursery.start_soon(receiver, stream1)
# The other task pretends to be the client, sending a batch of
# messages. The last message is b"z", so the receiver will exit
# cleanly.
nursery.start_soon(send_message_batch_impl, stream2, [b"a", b"b", b"z"])
# Try running the program to test these three different implementations. You
# should find that send_message_batch_0 and send_message_batch_2 work, but
# send_message_batch_1 gets stuck and locks up., send_message_batch_0), send_message_batch_1), send_message_batch_2)
# TODO to make this a good example for docs:
# - once we have helpers for working with lines, use a line-based protocol to
# make it more realistic.
# - demonstrate running the protocol over TCP, to show how it works for some
# message/batch sizes, but if you make them big enough then you can provoke
# the deadlock.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment