Clone CPython: git clone ...
Set up a build directory:
cd cpython
mkdir b
cd b
../configure --prefix="$PWD/install" OPT="-O0"
# This ignores some issues that a real production-quality version should | |
# address; see https://github.com/python-trio/trio/issues/279 for | |
# the full details. But it should handle simple cases OK. | |
async def open_unix_domain_listeners(path, *, permissions=None): | |
sock = trio.socket.socket(trio.socket.AF_UNIX, trio.socket.SOCK_STREAM) | |
try: | |
os.unlink(path) | |
except OSError: | |
pass |
async def communicate(self, stdin): | |
stdout = [] | |
stderr = [] | |
async def receive_all_into(pipe, buf): | |
while True: | |
data = await pipe.receive_some(LIMIT) | |
if not data: | |
break | |
buf.append(data) |
import h11 | |
def write_headers_titlecase(headers, write): | |
for name, value in headers: | |
if name == b"host": | |
write(b"%s: %s\r\n" % (name.title(), value)) | |
for name, value in headers: | |
if name != b"host": | |
write(b"%s: %s\r\n" % (name.title(), value)) | |
write(b"\r\n") |
from functools import wraps, partial | |
from contextlib import asynccontextmanager | |
import trio | |
def producer(wrapped): | |
@asynccontextmanager | |
@functools.wraps(wrapped) | |
async def wrapper(*args, **kwargs): | |
if "send_channel" in kwargs: | |
raise TypeError |
Clone CPython: git clone ...
Set up a build directory:
cd cpython
mkdir b
cd b
../configure --prefix="$PWD/install" OPT="-O0"
class TokenBucket: | |
def __init__(self, tokens_per_second, max_tokens, starting_tokens): | |
self._tokens_per_second = tokens_per_second | |
self._max_tokens = max_tokens | |
# floating point, because tracking partial tokens makes the code | |
# simpler. | |
self._tokens = starting_tokens | |
self._last_update_time = trio.current_time() | |
def _update(self): |
# Trio program that reads from stdin, uppercases everything, and then echoes | |
# it back to stdout. | |
# This uses some internal APIs, because Trio v0.7.0 doesn't have a real public | |
# API for talking to stdin/stdout yet. This only works on Unix. | |
# If you want to help implement a real API for this, or just check whether | |
# anything new has happened, see: | |
# https://github.com/python-trio/trio/issues/174 | |
import trio |
# http://pylint.pycqa.org/en/latest/how_tos/custom_checkers.html | |
import astroid | |
from pylint.checkers import BaseChecker | |
from pylint.interfaces import IAstroidChecker | |
def register(linter): | |
linter.register_checker(MissingAwaitChecker(linter)) |
Trio is a friendly async concurrency library for Python that's obsessed with correctness and usability.
On the correctness side, one of the features that makes Trio unique is that when an exception occurs, it's never discarded: it always propogates until caught, and if it's not caught, then you automatically get a complete traceback, just like with regular Python. Errors should never pass silently!
But... in Trio v0.6.0 and earlier, these tracebacks also contained a lot of clutter showing details of Trio's internal plumbing, which made it difficult to see the parts that were relevant to your code. It's a small thing, but when it's midnight and you're debugging some nasty concurrency bug, it can make a big difference to have exactly the information you need, clearly laid out, without distractions.
And thanks to some hard work by @belm0, the upcoming Trio v0.7.0 will give you exactly that: clean tracebacks that show you wh
@asynccontextmanager | |
async def subscribe(host, port, subscription_info): | |
# Make connection, with a 'with' block so we make sure to close it eventually | |
async with await trio.open_tcp_stream(host, port) as stream: | |
# Subscribe to whatever channel | |
await msg_send(stream, subscription_info) | |
# Start heartbeat task | |
async with trio.open_nursery() as nursery: | |
nursery.start_soon(heartbeat_sender, stream) | |
# Done setting up this context manager. Return the message iterator object. |