Skip to content

Instantly share code, notes, and snippets.

import trio
import functools
class ReplicatedBroadcastFailed(Exception):
class ReplicatedBroadcast:
def __init__(self, nursery, async_iterable):
self._nursery = nursery
# Create a new network namespace named 'bad', which will come with its own private loopback interface
sudo ip netns create bad
# Make the loopback interface slow, drop packets, etc.
sudo ip netns exec bad tc qdisc add dev lo root netem delay 200ms 40ms 25% # ... add more netem options here
# Turn on the loopback interface
sudo ip netns exec bad ip link set lo up
# Start a shell inside the new namespace
sudo ip netns exec bad sudo -u $USER /bin/bash
# Try running 'ping' to see how bad the network is
async def with_item_timeout(aiter, timeout):
aiter = aiter.__aiter__()
while True:
with trio.move_on_after(timeout) as cscope:
val = await aiter.__anext__()
except StopAsyncIteration:
if cscope.cancelled_caught:
# Rough draft of a Queue object that can be used simultaneously simultaneously from
# sync threads + *multiple* trio threads, all at once.
# If you don't have multiple threads each doing their own separate calls to,
# then don't use this; there are simpler solutions. This was mostly an exercise to
# figure out if and how this could be done.
# Also note: completely untested, probably has bugs
from collections import OrderedDict

Question: suppose we wanted to let users mostly deal with individual exception objects instead of ExceptionGroups, and we want to do that without losing the rich traceback metadata. (Tree structure, notes about places where exceptions crossed between tasks, ...) What would that look like?

The big challenge is that in current trio.MultiError, if you want to extract an individual exception and make it a proper standalone object, you have to mutate its __traceback__ attribute to capture the info that's otherwise only present in the MultiError object. (And this in turn is the main reason I've argued for except ValueError as exc being run once with a single ExceptionGroup: if we iterate, then we have to extract individual exceptions. If we wrap in a new ExceptionGroup, then we never have to mutate any exception objects in place.) So if we can solve this mutation problem, then it unlocks a lot of new possibilities.

Requirements would be:

  • Given an individual exception exc, you can print its trac
import trio
import weakref
import cachetools
# Note: this assumes you only have one trio thread running at once...
# In the unlikely event that you have multiple, you should put the locks and cache
# into thread-local or run-local storage.
def trio_lru_cache(fn):
cache = cachetools.LRUCache()
# Trick: this contains a lock for each unique cache key in use,
class EnterTimeout:
def __init__(self, async_cm, timeout):
self._async_cm = async_cm
self._timeout = timeout
async def __aenter__(self):
with fail_after(self.timeout):
return await self._async_cm.__aenter__()
async def __aexit__(self, *args):
import trio
import sys
import time
import httpx
from outcome import Error
import traceback
# Can't use PySide2 currently because of
from PyQt5 import QtCore, QtWidgets
import trio
import trio.testing
# A simple protocol where messages are single bytes b"a", b"b", b"c", etc.,
# and each one is acknowledged by echoing it back uppercased. So send b"a",
# get b"A", etc.
# To make it easier to shut down, message b"z" causes the receiver to exit.
async def receiver(stream):
from byoio import receive_until, receive_some
async def decode_chunked(stream, buf):
while True:
header = await receive_until(stream, buf, b"\r\n")
to_read = int(header.strip()) # FIXME: proper validation
if to_read == 0:
# FIXME: read trailers
while to_read > 0: