Skip to content

Instantly share code, notes, and snippets.

# Rough draft of a Queue object that can be used simultaneously simultaneously from
# sync threads + *multiple* trio threads, all at once.
# If you don't have multiple threads each doing their own separate calls to,
# then don't use this; there are simpler solutions. This was mostly an exercise to
# figure out if and how this could be done.
# Also note: completely untested, probably has bugs
from collections import OrderedDict

Question: suppose we wanted to let users mostly deal with individual exception objects instead of ExceptionGroups, and we want to do that without losing the rich traceback metadata. (Tree structure, notes about places where exceptions crossed between tasks, ...) What would that look like?

The big challenge is that in current trio.MultiError, if you want to extract an individual exception and make it a proper standalone object, you have to mutate its __traceback__ attribute to capture the info that's otherwise only present in the MultiError object. (And this in turn is the main reason I've argued for except ValueError as exc being run once with a single ExceptionGroup: if we iterate, then we have to extract individual exceptions. If we wrap in a new ExceptionGroup, then we never have to mutate any exception objects in place.) So if we can solve this mutation problem, then it unlocks a lot of new possibilities.

Requirements would be:

  • Given an individual exception exc, you can print its trac
import trio
import weakref
import cachetools
# Note: this assumes you only have one trio thread running at once...
# In the unlikely event that you have multiple, you should put the locks and cache
# into thread-local or run-local storage.
def trio_lru_cache(fn):
cache = cachetools.LRUCache()
# Trick: this contains a lock for each unique cache key in use,
class EnterTimeout:
def __init__(self, async_cm, timeout):
self._async_cm = async_cm
self._timeout = timeout
async def __aenter__(self):
with fail_after(self.timeout):
return await self._async_cm.__aenter__()
async def __aexit__(self, *args):
import trio
import sys
import time
import httpx
from outcome import Error
import traceback
# Can't use PySide2 currently because of
from PyQt5 import QtCore, QtWidgets
import trio
import trio.testing
# A simple protocol where messages are single bytes b"a", b"b", b"c", etc.,
# and each one is acknowledged by echoing it back uppercased. So send b"a",
# get b"A", etc.
# To make it easier to shut down, message b"z" causes the receiver to exit.
async def receiver(stream):
from byoio import receive_until, receive_some
async def decode_chunked(stream, buf):
while True:
header = await receive_until(stream, buf, b"\r\n")
to_read = int(header.strip()) # FIXME: proper validation
if to_read == 0:
# FIXME: read trailers
while to_read > 0:
import trio
async def worker(receive_chan):
# Trick: each url gets its own clone of the send channel
# After processing a URL, we close its clone
# That way, when all the URLs are done, all the send clones will be
# closed, so the 'async for ... in receive_chan' will automatically exit.
async for send_chan, url in receive_chan:
import socket
import errno
import select
a, b = socket.socketpair(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
while True:
import trio
import os
import json
from itertools import count
class Trace(
def __init__(self, out):
self.out = out