Skip to content

Instantly share code, notes, and snippets.

View cache.py
import trio
import weakref
import cachetools
# Note: this assumes you only have one trio thread running at once...
# In the unlikely event that you have multiple, you should put the locks and cache
# into thread-local or run-local storage.
def trio_lru_cache(fn):
cache = cachetools.LRUCache()
# Trick: this contains a lock for each unique cache key in use,
View enter-timeout.py
class EnterTimeout:
def __init__(self, async_cm, timeout):
self._async_cm = async_cm
self._timeout = timeout
async def __aenter__(self):
with fail_after(self.timeout):
return await self._async_cm.__aenter__()
async def __aexit__(self, *args):
View trio-qt-demo.py
import trio
import sys
import time
import httpx
from outcome import Error
import traceback
# Can't use PySide2 currently because of
# https://bugreports.qt.io/projects/PYSIDE/issues/PYSIDE-1313
from PyQt5 import QtCore, QtWidgets
View send-deadlock-example.py
import trio
import trio.testing
# A simple protocol where messages are single bytes b"a", b"b", b"c", etc.,
# and each one is acknowledged by echoing it back uppercased. So send b"a",
# get b"A", etc.
# To make it easier to shut down, message b"z" causes the receiver to exit.
async def receiver(stream):
View chunked-coro.py
from byoio import receive_until, receive_some
async def decode_chunked(stream, buf):
while True:
header = await receive_until(stream, buf, b"\r\n")
to_read = int(header.strip()) # FIXME: proper validation
if to_read == 0:
# FIXME: read trailers
return
while to_read > 0:
View crawler.py
import trio
WORKER_COUNT = 10
async def worker(receive_chan):
# Trick: each url gets its own clone of the send channel
# After processing a URL, we close its clone
# That way, when all the URLs are done, all the send clones will be
# closed, so the 'async for ... in receive_chan' will automatically exit.
async for send_chan, url in receive_chan:
View macos-dgram-weirdness.py
import socket
import errno
import select
a, b = socket.socketpair(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
a.setblocking(False)
try:
while True:
a.send(b"hi")
View trace-test.py
import trio
import os
import json
from itertools import count
# https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview#
class Trace(trio.abc.Instrument):
def __init__(self, out):
self.out = out
View linux-output.txt
~/trio/notes-to-self$ python sleep-time.py
----------------------------------------------------------------
Starting: import select; select.select([], [], [], 6)
Expected duration: 6 seconds
Putting it to sleep for 2.0 seconds
Waking it up again
Actual duration: 8.05
----------------------------------------------------------------
Starting: import select; ep = select.epoll(); ep.poll(6)
Expected duration: 6 seconds
View generalized-move-on-after.py
# https://gitter.im/python-trio/general?at=5cddee9a6366992a94ce41c5
@asynccontextmanager
async def generalized_move_on_after():
async with trio.open_nursery() as nursery:
async def arbitrary_event():
await wait_for_some_arbitrary_event()
nursery.cancel_scope.cancel()
nursery.start_soon(arbitrary_event)
yield nursery.cancel_scope
nursery.cancel_scope.cancel()
You can’t perform that action at this time.