Skip to content

Instantly share code, notes, and snippets.

import sys
import msvcrt
import ctypes
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
kernel32.GetConsoleMode.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_uint)]
kernel32.GetConsoleMode.restype = ctypes.c_int
import trio
import functools
class ReplicatedBroadcastFailed(Exception):
pass
class ReplicatedBroadcast:
def __init__(self, nursery, async_iterable):
self._nursery = nursery
# Create a new network namespace named 'bad', which will come with its own private loopback interface
sudo ip netns create bad
# Make the loopback interface slow, drop packets, etc.
sudo ip netns exec bad tc qdisc add dev lo root netem delay 200ms 40ms 25% # ... add more netem options here
# Turn on the loopback interface
sudo ip netns exec bad ip link set lo up
# Start a shell inside the new namespace
sudo ip netns exec bad sudo -u $USER /bin/bash
# Try running 'ping 127.0.0.1' to see how bad the network is
async def with_item_timeout(aiter, timeout):
aiter = aiter.__aiter__()
while True:
with trio.move_on_after(timeout) as cscope:
try:
val = await aiter.__anext__()
except StopAsyncIteration:
return
if cscope.cancelled_caught:
print("cancelled")
# Rough draft of a Queue object that can be used simultaneously simultaneously from
# sync threads + *multiple* trio threads, all at once.
#
# If you don't have multiple threads each doing their own separate calls to trio.run,
# then don't use this; there are simpler solutions. This was mostly an exercise to
# figure out if and how this could be done.
#
# Also note: completely untested, probably has bugs
from collections import OrderedDict

Question: suppose we wanted to let users mostly deal with individual exception objects instead of ExceptionGroups, and we want to do that without losing the rich traceback metadata. (Tree structure, notes about places where exceptions crossed between tasks, ...) What would that look like?

The big challenge is that in current trio.MultiError, if you want to extract an individual exception and make it a proper standalone object, you have to mutate its __traceback__ attribute to capture the info that's otherwise only present in the MultiError object. (And this in turn is the main reason I've argued for except ValueError as exc being run once with a single ExceptionGroup: if we iterate, then we have to extract individual exceptions. If we wrap in a new ExceptionGroup, then we never have to mutate any exception objects in place.) So if we can solve this mutation problem, then it unlocks a lot of new possibilities.

Requirements would be:

  • Given an individual exception exc, you can print its trac
import trio
import weakref
import cachetools
# Note: this assumes you only have one trio thread running at once...
# In the unlikely event that you have multiple, you should put the locks and cache
# into thread-local or run-local storage.
def trio_lru_cache(fn):
cache = cachetools.LRUCache()
# Trick: this contains a lock for each unique cache key in use,
class EnterTimeout:
def __init__(self, async_cm, timeout):
self._async_cm = async_cm
self._timeout = timeout
async def __aenter__(self):
with fail_after(self.timeout):
return await self._async_cm.__aenter__()
async def __aexit__(self, *args):
import trio
import sys
import time
import httpx
from outcome import Error
import traceback
# Can't use PySide2 currently because of
# https://bugreports.qt.io/projects/PYSIDE/issues/PYSIDE-1313
from PyQt5 import QtCore, QtWidgets
import trio
import trio.testing
# A simple protocol where messages are single bytes b"a", b"b", b"c", etc.,
# and each one is acknowledged by echoing it back uppercased. So send b"a",
# get b"A", etc.
# To make it easier to shut down, message b"z" causes the receiver to exit.
async def receiver(stream):