Skip to content

Instantly share code, notes, and snippets.

# This ignores some issues that a real production-quality version should
# address; see https://github.com/python-trio/trio/issues/279 for
# the full details. But it should handle simple cases OK.
async def open_unix_domain_listeners(path, *, permissions=None):
sock = trio.socket.socket(trio.socket.AF_UNIX, trio.socket.SOCK_STREAM)
try:
os.unlink(path)
except OSError:
pass
async def communicate(self, stdin):
stdout = []
stderr = []
async def receive_all_into(pipe, buf):
while True:
data = await pipe.receive_some(LIMIT)
if not data:
break
buf.append(data)
import h11
def write_headers_titlecase(headers, write):
for name, value in headers:
if name == b"host":
write(b"%s: %s\r\n" % (name.title(), value))
for name, value in headers:
if name != b"host":
write(b"%s: %s\r\n" % (name.title(), value))
write(b"\r\n")
@njsmith
njsmith / producer.py
Created October 21, 2018 08:24
Idea for ergonomic, safe alternative to async generators
from functools import wraps, partial
from contextlib import asynccontextmanager
import trio
def producer(wrapped):
@asynccontextmanager
@functools.wraps(wrapped)
async def wrapper(*args, **kwargs):
if "send_channel" in kwargs:
raise TypeError

Clone CPython: git clone ...

Set up a build directory:

cd cpython
mkdir b
cd b
../configure --prefix="$PWD/install" OPT="-O0"
@njsmith
njsmith / token-bucket.py
Last active October 2, 2018 23:53
Rate limited async iterator decorator
class TokenBucket:
def __init__(self, tokens_per_second, max_tokens, starting_tokens):
self._tokens_per_second = tokens_per_second
self._max_tokens = max_tokens
# floating point, because tracking partial tokens makes the code
# simpler.
self._tokens = starting_tokens
self._last_update_time = trio.current_time()
def _update(self):
@njsmith
njsmith / shouty.py
Last active September 21, 2018 03:45
# Trio program that reads from stdin, uppercases everything, and then echoes
# it back to stdout.
# This uses some internal APIs, because Trio v0.7.0 doesn't have a real public
# API for talking to stdin/stdout yet. This only works on Unix.
# If you want to help implement a real API for this, or just check whether
# anything new has happened, see:
# https://github.com/python-trio/trio/issues/174
import trio
# http://pylint.pycqa.org/en/latest/how_tos/custom_checkers.html
import astroid
from pylint.checkers import BaseChecker
from pylint.interfaces import IAstroidChecker
def register(linter):
linter.register_checker(MissingAwaitChecker(linter))
@njsmith
njsmith / improved-tracebacks.md
Created September 3, 2018 02:48
Improved tracebacks in Trio v0.7.0

Improved tracebacks in Trio v0.7.0

Trio is a friendly async concurrency library for Python that's obsessed with correctness and usability.

On the correctness side, one of the features that makes Trio unique is that when an exception occurs, it's never discarded: it always propogates until caught, and if it's not caught, then you automatically get a complete traceback, just like with regular Python. Errors should never pass silently!

But... in Trio v0.6.0 and earlier, these tracebacks also contained a lot of clutter showing details of Trio's internal plumbing, which made it difficult to see the parts that were relevant to your code. It's a small thing, but when it's midnight and you're debugging some nasty concurrency bug, it can make a big difference to have exactly the information you need, clearly laid out, without distractions.

And thanks to some hard work by @belm0, the upcoming Trio v0.7.0 will give you exactly that: clean tracebacks that show you wh

@asynccontextmanager
async def subscribe(host, port, subscription_info):
# Make connection, with a 'with' block so we make sure to close it eventually
async with await trio.open_tcp_stream(host, port) as stream:
# Subscribe to whatever channel
await msg_send(stream, subscription_info)
# Start heartbeat task
async with trio.open_nursery() as nursery:
nursery.start_soon(heartbeat_sender, stream)
# Done setting up this context manager. Return the message iterator object.