Skip to content

Instantly share code, notes, and snippets.

@njsmith
Last active November 28, 2018 03:35
Show Gist options
  • Save njsmith/40b7b7f65e5f433789153c7b668ce643 to your computer and use it in GitHub Desktop.
Save njsmith/40b7b7f65e5f433789153c7b668ce643 to your computer and use it in GitHub Desktop.
from collections import deque, OrderedDict
import trio
class MultiGetQueue:
def __init__(self, max_size):
self._max_size = max_size
# {task: abort func}
# probably should make Task._abort_func public, or maybe even change
# the reschedule code to call the abort func in general?
self._get_wait = OrderedDict()
# {task: queued value}
self._put_wait = OrderedDict()
# invariants:
# if len(self._q) < self._max_size, then self._put_wait is empty
# if len(self._q) > 0, then self._get_wait is empty
self._q = deque()
async def put(self, obj):
await trio.hazmat.yield_if_cancelled()
if self._get_wait:
assert not self._q
task, abort_fn = self._get_wait.popitem(last=False)
abort_fn()
trio.hazmat.reschedule(task, trio.Value((self, obj)))
await trio.hazmat.yield_briefly_no_cancel()
elif len(self._q) < self._max_size:
self._q.append(obj)
await trio.hazmat.yield_briefly_no_cancel()
else:
task = trio.current_task()
self._put_wait[task] = obj
def abort_fn(_):
del self._put_wait[task]
return trio.hazmat.Abort.SUCCEEDED
await trio.hazmat.yield_indefinitely(abort_fn)
async def get(self):
_, value = await multi_get([self])
return value
async def multi_get(queues):
# Returns (queue object, value gotten)
await trio.hazmat.yield_if_cancelled()
for queue in queues:
if queue._put_wait:
task, value = queue._put_wait.popitem(last=False)
# No need to check max_size, b/c we'll pop an item off again right
# below.
queue._q.append(value)
trio.hazmat.reschedule(task)
if queue._q:
value = queue._q.popleft()
await trio.hazmat.yield_briefly_no_cancel()
return value
# No queue had anything.
task = trio.current_task()
def abort_fn(_):
for queue in queues:
del queue._get_wait[task]
return trio.hazmat.Abort.SUCCEEDED
for queue in queues:
queue._get_wait[task] = abort_fn
return await trio.hazmat.yield_indefinitely(abort_fn)
@njsmith
Copy link
Author

njsmith commented Sep 6, 2017

@sorcio: doh, yeah, those yields were totally in the wrong place. That's what happen when I write too fast :-). Fixed.

The above prioritizes queues in the order they're passed to multi_get, which is what @parity3 specifically wanted for their use case :-), and then is FIFO-fair on each individual queue. Golang uses simple randomization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment