Skip to content

Instantly share code, notes, and snippets.

@nonsleepr
Created July 5, 2018 21:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nonsleepr/43420a0ce659f1b870544fffb9e5cda4 to your computer and use it in GitHub Desktop.
Save nonsleepr/43420a0ce659f1b870544fffb9e5cda4 to your computer and use it in GitHub Desktop.
Multiplexer for trio: my take on python-trio/trio#467
import trio
from multiplexer import Multiplexer
async def reader(mx, key, timeout=100):
print(f'Waiting for "{key}"...')
try:
with trio.fail_after(timeout):
value = await mx[key]
print(f'Got value "{value}" for key {key}')
except trio.TooSlowError:
print(f"Don't want to wait any longer for {key}")
except Exception as e:
print(f'Ouch! {e}')
raise e
async def writer(mx):
for i in range(10):
await trio.sleep(0.2)
print(f'Writing {i}...')
mx[i] = i * 10
async def runner():
async with trio.open_nursery() as nursery:
mx = Multiplexer()
nursery.start_soon(reader, mx, 15, 5)
nursery.start_soon(reader, mx, 7)
nursery.start_soon(reader, mx, 3)
nursery.start_soon(writer, mx)
def test_multiplexer():
trio.run(runner)
async def writer2(mx, errors):
for i in range(10):
await trio.sleep(0.2)
print(f'Writing {i}...')
if i in errors:
mx.set_exception(i, Exception('Ka-Boom!'))
else:
mx[i] = i * 10
async def runner2():
async with trio.open_nursery() as nursery:
mx = Multiplexer()
nursery.start_soon(reader, mx, 15, 5)
nursery.start_soon(reader, mx, 7)
nursery.start_soon(reader, mx, 3)
nursery.start_soon(writer2, mx, (7,9))
def test_multiplexer_with_error():
trio.run(runner2)
import trio
import outcome
class Multiplexer(object):
def __init__(self): # TODO: Add capacity
self._data = {}
self._get_wait = {}
def get_nowait(self, key):
if key in self._data:
return self._data[key]
# TODO: Should we delete the value after?
raise trio.WouldBlock
async def __getitem__(self, key):
# TODO: Reseach why that's required?
await trio.hazmat.checkpoint_if_cancelled()
try:
value = self.get_nowait(key)
except trio.WouldBlock:
pass
else:
await trio.cancel_shielded_checkpoint()
return value
# Must wait
task = trio.hazmat.current_task()
def abort_fn(_):
self._get_wait.pop(key, None)
return trio.hazmat.Abort.SUCCEEDED
# TODO: Make sure there are no other waiters
self._get_wait[key] = task
try:
value = await trio.hazmat.wait_task_rescheduled(abort_fn)
finally:
self._get_wait.pop(key, None)
return value
def __setitem__(self, key, value):
self._data[key] = value
task = self._get_wait.get(key)
if task:
if isinstance(value, (outcome.Value, outcome.Error)):
trio.hazmat.reschedule(task, value)
else:
trio.hazmat.reschedule(task, outcome.Value(value))
def set_exception(self, key, exc):
task = self._get_wait.get(key)
if task:
trio.hazmat.reschedule(task, outcome.Error(exc))
def fail(self, exc):
for task in self._get_wait.values():
trio.hazmat.reschedule(task, outcome.Error(exc))
async def set(self, key, value):
await trio.hazmat.checkpoint_if_cancelled()
try:
self[key] = value
except trio.WouldBlock:
pass
else:
await trio.cancel_shielded_checkpoint()
return
raise Exception('async set is not fully implemented')
@smurfix
Copy link

smurfix commented Oct 9, 2018

if isinstance(value, outcome.Outcome):

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment