This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from byoio import receive_until, receive_some | |
async def decode_chunked(stream, buf): | |
while True: | |
header = await receive_until(stream, buf, b"\r\n") | |
to_read = int(header.strip()) # FIXME: proper validation | |
if to_read == 0: | |
# FIXME: read trailers | |
return | |
while to_read > 0: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
WORKER_COUNT = 10 | |
async def worker(receive_chan): | |
# Trick: each url gets its own clone of the send channel | |
# After processing a URL, we close its clone | |
# That way, when all the URLs are done, all the send clones will be | |
# closed, so the 'async for ... in receive_chan' will automatically exit. | |
async for send_chan, url in receive_chan: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import errno | |
import select | |
a, b = socket.socketpair(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) | |
a.setblocking(False) | |
try: | |
while True: | |
a.send(b"hi") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
import os | |
import json | |
from itertools import count | |
# https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview# | |
class Trace(trio.abc.Instrument): | |
def __init__(self, out): | |
self.out = out |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
~/trio/notes-to-self$ python sleep-time.py | |
---------------------------------------------------------------- | |
Starting: import select; select.select([], [], [], 6) | |
Expected duration: 6 seconds | |
Putting it to sleep for 2.0 seconds | |
Waking it up again | |
Actual duration: 8.05 | |
---------------------------------------------------------------- | |
Starting: import select; ep = select.epoll(); ep.poll(6) | |
Expected duration: 6 seconds |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://gitter.im/python-trio/general?at=5cddee9a6366992a94ce41c5 | |
@asynccontextmanager | |
async def generalized_move_on_after(): | |
async with trio.open_nursery() as nursery: | |
async def arbitrary_event(): | |
await wait_for_some_arbitrary_event() | |
nursery.cancel_scope.cancel() | |
nursery.start_soon(arbitrary_event) | |
yield nursery.cancel_scope | |
nursery.cancel_scope.cancel() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://gitter.im/python-trio/general?at=5c6f46089155d45d905346b3 | |
import trio | |
from functools import partial | |
class RenewableResource(trio.abc.AsyncResource): | |
def __init__(self, factory, *args): | |
self._factory = partial(factory, *args) | |
self._resource = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio | |
from typing import Dict | |
class Resources: | |
def __init__(self, quantities: Dict[str, int]): | |
self.quantities = quantities | |
def __contains__(self, other): | |
for k, v in other.quantities: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import partial, update_wrapper | |
def takes_callable(wrapped_fn): | |
def simple(fn, *args, **kwargs): | |
return wrapped_fn(partial(fn, *args, **kwargs)) | |
update_wrapper(simple, wrapped_fn, assigned=["__module__", "__name__", "__qualname__", "__doc__"]) | |
simple.with_options = wrapped_fn | |
return simple | |
@takes_callable |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# One of several ways to handle a lot of jobs with a limit on how many run at once | |
# Context: https://gitter.im/python-trio/general?at=5c3d2643c45b986d116009bc | |
async def limited_concurrency(job_handler, jobs, max_at_once): | |
limiter = trio.Semaphore(max_at_once) | |
async def task_fn(job): | |
await job_handler(job) | |
limiter.release() | |
async with trio.open_nursery() as nursery: |