Skip to content

Instantly share code, notes, and snippets.

Nathaniel J. Smith njsmith

Block or report user

Report or block njsmith

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View trace-test.py
import trio
import os
import json
from itertools import count
# https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview#
class Trace(trio.abc.Instrument):
def __init__(self, out):
self.out = out
View linux-output.txt
~/trio/notes-to-self$ python sleep-time.py
----------------------------------------------------------------
Starting: import select; select.select([], [], [], 6)
Expected duration: 6 seconds
Putting it to sleep for 2.0 seconds
Waking it up again
Actual duration: 8.05
----------------------------------------------------------------
Starting: import select; ep = select.epoll(); ep.poll(6)
Expected duration: 6 seconds
View generalized-move-on-after.py
# https://gitter.im/python-trio/general?at=5cddee9a6366992a94ce41c5
@asynccontextmanager
async def generalized_move_on_after():
async with trio.open_nursery() as nursery:
async def arbitrary_event():
await wait_for_some_arbitrary_event()
nursery.cancel_scope.cancel()
nursery.start_soon(arbitrary_event)
yield nursery.cancel_scope
nursery.cancel_scope.cancel()
View renewable-resource.py
# https://gitter.im/python-trio/general?at=5c6f46089155d45d905346b3
import trio
from functools import partial
class RenewableResource(trio.abc.AsyncResource):
def __init__(self, factory, *args):
self._factory = partial(factory, *args)
self._resource = None
View resource-pool.py
import trio
from typing import Dict
class Resources:
def __init__(self, quantities: Dict[str, int]):
self.quantities = quantities
def __contains__(self, other):
for k, v in other.quantities:
View options.py
from functools import partial, update_wrapper
def takes_callable(wrapped_fn):
def simple(fn, *args, **kwargs):
return wrapped_fn(partial(fn, *args, **kwargs))
update_wrapper(simple, wrapped_fn, assigned=["__module__", "__name__", "__qualname__", "__doc__"])
simple.with_options = wrapped_fn
return simple
@takes_callable
View limited-concurrency-using-semaphore.py
# One of several ways to handle a lot of jobs with a limit on how many run at once
# Context: https://gitter.im/python-trio/general?at=5c3d2643c45b986d116009bc
async def limited_concurrency(job_handler, jobs, max_at_once):
limiter = trio.Semaphore(max_at_once)
async def task_fn(job):
await job_handler(job)
limiter.release()
async with trio.open_nursery() as nursery:
View serve_unix_domain.py
# This ignores some issues that a real production-quality version should
# address; see https://github.com/python-trio/trio/issues/279 for
# the full details. But it should handle simple cases OK.
async def open_unix_domain_listeners(path, *, permissions=None):
sock = trio.socket.socket(trio.socket.AF_UNIX, trio.socket.SOCK_STREAM)
try:
os.unlink(path)
except OSError:
pass
View communicate.py
async def communicate(self, stdin):
stdout = []
stderr = []
async def receive_all_into(pipe, buf):
while True:
data = await pipe.receive_some(LIMIT)
if not data:
break
buf.append(data)
View monkeypatch-h11-header-case.py
import h11
def write_headers_titlecase(headers, write):
for name, value in headers:
if name == b"host":
write(b"%s: %s\r\n" % (name.title(), value))
for name, value in headers:
if name != b"host":
write(b"%s: %s\r\n" % (name.title(), value))
write(b"\r\n")
You can’t perform that action at this time.