Skip to content

Instantly share code, notes, and snippets.

@types.coroutine
def callcc(func):
"""The almighty call-with-current-continuation, now in coroutine form.
This function returns an awaitable object, so it must be called
like this:
retval = await callcc(some_func)
Then if some_func looks like this:
def some_func(return_cb, throw_cb):
async def reader(chan, return_on=None, tag="got val"):
while True:
val = await chan.recv()
print(tag, val)
if return_on is not None and val == return_on:
print("returning", val)
return val
@catern
catern / async.py
Last active January 29, 2018 19:56
import os
from typing import Callable, Any
import contextlib
import socketio
from contextlib import contextmanager
import types
@types.coroutine
def callcc(func):
"""The almighty call-with-current-continuation, now in coroutine form.
import typing as t
import types
class SetOnlyOnce:
"""Wraps functions and ensures only one is called.
A simple stateful class that can wrap any number of functions, and ensures
that only one of them is ever called, and only once.
"""
{ stdenv, buildPythonPackage, fetchzip
, pytest, pytest-asyncio, pytestcov }:
buildPythonPackage rec {
pname = "aioconsole";
version = "0.1.7";
name = "${pname}-${version}";
# tests are not included in the version on PyPI
src = fetchzip {
sbaugh@sbaugh-sandbox ~/.local/src/nixmono $ cat .gitmodules
[submodule "nixpkgs"]
path = nixpkgs
url = /home/sbaugh/.local/src/nixpkgs/
sbaugh@sbaugh-sandbox ~/.local/src/nixmono $ ls -l
total 16
drwx------ 3 sbaugh sbaugh 4096 Mar 20 15:13 foo
-rw-r--r-- 1 sbaugh sbaugh 293 Mar 20 15:12 foo.tar.gz
drwxr-xr-x 2 sbaugh sbaugh 4096 Mar 20 16:21 nixpkgs
-rw-r--r-- 1 sbaugh sbaugh 2453 Mar 20 16:15 notes.org
nix-env -iE 'channels: with channels.nixpkgs {}; symlinkJoin {name = "utillinux"; paths = [ utillinux utillinux.man ]; }'
Traceback (most recent call last):
File "/nix/store/6f36kqsj1yahwv1ihcvwgi0ccgvkdfcj-python3.7-trio-0.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 1314, in run
run_impl(runner, async_fn, args)
File "/nix/store/6f36kqsj1yahwv1ihcvwgi0ccgvkdfcj-python3.7-trio-0.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 1378, in run_impl
runner.io_manager.handle_io(timeout)
File "/nix/store/6f36kqsj1yahwv1ihcvwgi0ccgvkdfcj-python3.7-trio-0.7.0/lib/python3.7/site-packages/trio/_core/_io_epoll.py", line 68, in handle_io
events = self._epoll.poll(timeout, max_events)
OSError: [Errno 0] Error
The above exception was the direct cause of the following exception:
# using classes and factory methods requires boilerplate - make_element
class Element:
def __init__(self, directory, data):
self.directory = directory
self.data = data
def frobnicate(self):
dothing(self.directory, self.data)
class Toplevel:
def make_toplevel(directory):
class Element:
def __init__(self, data):
self.data = data
def frobnicate(self):
dothing(directory, self.data)
def return_things():
return [Element("foo"), Element("bar")]