Skip to content

Instantly share code, notes, and snippets.

from byoio import receive_until, receive_some
async def decode_chunked(stream, buf):
while True:
header = await receive_until(stream, buf, b"\r\n")
to_read = int(header.strip()) # FIXME: proper validation
if to_read == 0:
# FIXME: read trailers
return
while to_read > 0:
import trio
WORKER_COUNT = 10
async def worker(receive_chan):
# Trick: each url gets its own clone of the send channel
# After processing a URL, we close its clone
# That way, when all the URLs are done, all the send clones will be
# closed, so the 'async for ... in receive_chan' will automatically exit.
async for send_chan, url in receive_chan:
import socket
import errno
import select
a, b = socket.socketpair(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
a.setblocking(False)
try:
while True:
a.send(b"hi")
import trio
import os
import json
from itertools import count
# https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview#
class Trace(trio.abc.Instrument):
def __init__(self, out):
self.out = out
~/trio/notes-to-self$ python sleep-time.py
----------------------------------------------------------------
Starting: import select; select.select([], [], [], 6)
Expected duration: 6 seconds
Putting it to sleep for 2.0 seconds
Waking it up again
Actual duration: 8.05
----------------------------------------------------------------
Starting: import select; ep = select.epoll(); ep.poll(6)
Expected duration: 6 seconds
# https://gitter.im/python-trio/general?at=5cddee9a6366992a94ce41c5
@asynccontextmanager
async def generalized_move_on_after():
async with trio.open_nursery() as nursery:
async def arbitrary_event():
await wait_for_some_arbitrary_event()
nursery.cancel_scope.cancel()
nursery.start_soon(arbitrary_event)
yield nursery.cancel_scope
nursery.cancel_scope.cancel()
# https://gitter.im/python-trio/general?at=5c6f46089155d45d905346b3
import trio
from functools import partial
class RenewableResource(trio.abc.AsyncResource):
def __init__(self, factory, *args):
self._factory = partial(factory, *args)
self._resource = None
import trio
from typing import Dict
class Resources:
def __init__(self, quantities: Dict[str, int]):
self.quantities = quantities
def __contains__(self, other):
for k, v in other.quantities:
@njsmith
njsmith / options.py
Created January 25, 2019 01:11
one way to approach python-trio/trio#470
from functools import partial, update_wrapper
def takes_callable(wrapped_fn):
def simple(fn, *args, **kwargs):
return wrapped_fn(partial(fn, *args, **kwargs))
update_wrapper(simple, wrapped_fn, assigned=["__module__", "__name__", "__qualname__", "__doc__"])
simple.with_options = wrapped_fn
return simple
@takes_callable
# One of several ways to handle a lot of jobs with a limit on how many run at once
# Context: https://gitter.im/python-trio/general?at=5c3d2643c45b986d116009bc
async def limited_concurrency(job_handler, jobs, max_at_once):
limiter = trio.Semaphore(max_at_once)
async def task_fn(job):
await job_handler(job)
limiter.release()
async with trio.open_nursery() as nursery: