Skip to content

Instantly share code, notes, and snippets.

@tkf
Created September 12, 2019 07:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tkf/e6b3de67571162f8aac690c8b8f7ee7a to your computer and use it in GitHub Desktop.
Save tkf/e6b3de67571162f8aac690c8b8f7ee7a to your computer and use it in GitHub Desktop.
from dataclasses import dataclass
from typing import Any
import trio
async def start_cancelable(nursery, async_fn, *args):
async def launcher(task_status):
with trio.CancelScope() as scope:
task_status.started(scope)
await async_fn(*args)
return await nursery.start(launcher)
@dataclass
class Reduced:
value: Any
async def _reduce(scopes, op, init, xs):
if len(xs) > 2:
mid = len(xs) // 2
xs1 = xs[:mid]
xs2 = xs[mid:]
acc2 = null = object()
async with trio.open_nursery() as nursery:
async def background():
nonlocal acc2
acc2 = await _reduce(scopes, op, init, xs2)
scope2 = await start_cancelable(nursery, background)
acc1 = await _reduce(scopes + [scope2], op, init, xs1)
if acc2 is null:
print("Canceled processing", xs2)
if isinstance(acc1, Reduced) or acc2 is null:
return acc1
elif isinstance(acc2, Reduced):
return acc2
else:
acc = await op(acc1, acc2)
else:
assert len(xs) in (1, 2)
acc = await op(init, xs[0])
if len(xs) == 2 and not isinstance(acc, Reduced):
acc = await op(acc, xs[1])
if isinstance(acc, Reduced):
print("Canceling", len(scopes), "scopes")
for s in scopes:
print("Cancel:", s)
s.cancel()
return acc
async def reduce(op, init, xs):
return await _reduce([], op, init, xs)
def sleepy(delay):
def g(f):
async def h(*args, **kwargs):
await trio.sleep(delay)
return f(*args, **kwargs)
return h
return g
def finder(needle):
def op(acc, x):
if x == needle:
return Reduced(x)
else:
return x
return op
def tracer(record):
def transducer(op):
def new_op(acc, x):
record.append((acc, x))
return op(acc, x)
return new_op
return transducer
def applyfs(*args):
x = args[-1]
for f in args[-2::-1]:
x = f(x)
return x
async def recording_find(needle, delay, xs=range(2 ** 3)):
record = []
op = applyfs(
# non-async-op -to- async-op transducer:
sleepy(delay),
# non-async-op -to- non-async-op transducer(s):
tracer(record),
# bottom step function:
finder(needle),
)
acc = await reduce(op, None, xs)
return (acc, record)
if __name__ == "__main__":
acc, record = trio.run(recording_find, 0, 0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment