Skip to content

Instantly share code, notes, and snippets.

@parity3
Last active September 5, 2017 06:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save parity3/10890d36b098e40a8cae24081d78e614 to your computer and use it in GitHub Desktop.
Save parity3/10890d36b098e40a8cae24081d78e614 to your computer and use it in GitHub Desktop.
trio wait on multiple queues
import collections, operator, datetime, importlib
if 0:
trio = module()
trio.run = None # type: callable
trio.WouldBlock = None
trio.Abort = None # type: abort_class
trio.current_task = None # type: callable
# STUBS for PyCharm
class abort_class:
SUCCEEDED = None
class ParkingLot_class:
park = None # type: callable
hazmat = module()
hazmat.ParkingLot = None # type: callable -> ParkingLot_class
hazmat.yield_indefinitely = None # type: callable
hazmat.reschedule = None # type: callable
else:
globals()['trio'] = _ = importlib.import_module("trio")
globals()['hazmat'] = _.hazmat
del _
class QueueNotOpenException(Exception):
pass
class WaiterEntry(object):
__slots__ = 'task', 'still_waiting', 'data', 'queue', 'aborted'
def __init__(self):
self.task = trio.current_task()
self.still_waiting = True
self.data = None
self.queue = None
self.aborted = True
def assign_data_and_wakeup(self,data,q : "DispatcherQueue"):
self.data = data
self.still_waiting = False
self.queue = q
hazmat.reschedule(self.task)
class Dispatcher:
"""
Factory for queues which allows for a get() to be run on multiple queues, prioritizing any ready data to the first
passed-in queue.
Queues created by this factory have one caveat: they MUST be close()'d after they are no longer in use.
Calling put() on a closed queue will generate a warning. Calling get() on a closed queue will raise a QueueNotOpenException.
A race-condition can happen where a canceled task can have data ready for it that must be requeued. Because of this,
the order (usually FIFO) of the queue data cannot be guaranteed.
"""
MAX_EVENT_WAITS = 10000
MAX_QUEUES = 1000
QueueNotOpenException = QueueNotOpenException
def __init__(self):
self.waiters_for_queues = {}
def log_func_off(self, *args):
pass
def log_func_on(self, *args):
print("{} - Dispatcher:".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')), *args)
log_func = log_func_off
warn_func = log_func_on
def queue(self) -> "DispatcherQueue":
q = DispatcherQueue(self)
num_queues = len(self.waiters_for_queues)
if num_queues >= self.MAX_QUEUES:
raise RuntimeError(f"number of queues exceeded, please close some: {num_queues+1} > {self.MAX_QUEUES}")
self.waiters_for_queues[q] = []
self.log_func(f"queue() created new, now at: {num_queues}")
return q
def close_queue(self, q: "DispatcherQueue"):
# must be called when queue no longer in use
rsp = self.waiters_for_queues.pop(q, None)
if rsp is None:
self.log_func(f"WARNING: close_queue() NOT removed, at: {len(self.waiters_for_queues)}")
else:
self.log_func(f"close_queue() removed, now at: {len(self.waiters_for_queues)}")
def get_nowait(self, *queues):
try:
ready_data = next(filter(None, map(operator.attrgetter('ready_data'), queues)))
except StopIteration:
raise trio.WouldBlock("queues are empty")
return ready_data.popleft()
async def get(self, *queues):
if not queues:
raise AssertionError("must be called with at least 1 queue")
try:
ready_data = next(filter(None, map(operator.attrgetter('ready_data'),queues)))
except StopIteration:
pass
else:
return ready_data.popleft()
entry = WaiterEntry()
# get the waiter entry lists for each queue
try:
qlists = list(map(self.waiters_for_queues.__getitem__, queues)) # type: list
except KeyError:
raise QueueNotOpenException('cannot get() on unknown or closed queue')
# register ourselves as a waiter entry for each of the queues we are interested in
collections.deque(map(operator.methodcaller('append', entry), qlists), 0)
try:
# have faith we will be woken up by a put or put_nowait
# reschedule() on this task can only be called once so there is no need to do anything synchronously in the abort_fn callback
await hazmat.yield_indefinitely(abort_fn=lambda *args: trio.Abort.SUCCEEDED)
if entry.still_waiting:
raise AssertionError("somehow rescheduled but no data waiting for me")
entry.aborted = False # if we got here everything is ok
return entry.data
finally:
if entry.aborted and not entry.still_waiting:
self.warn_func(f"WARNING: we were canceled and assigned data was never delivered!")
# Make sure the data is at least requeued so it is not lost.
self.requeue(entry.queue,entry.data)
# unregister myself from queues I added myself to
try:
collections.deque(map(operator.methodcaller('remove', entry), qlists), 0)
except ValueError:
raise AssertionError("someone else removed me from queue's waiter entries")
def requeue(self,q : "DispatcherQueue", data):
"""hazmat territory.
Something must have gone wrong in the caller so we have to put this data somewhere so it isn't lost
Try to ensure it isn't out of order by putting it in front of queue."""
try:
waiters = self.waiters_for_queues[q] # type: list
except KeyError:
# since we are being called from get(), if queue is closed we are done
return
for wentry in waiters: # type: WaiterEntry
if wentry.still_waiting:
# get() will guarantee this data isn't lost.
# Assign it to the task, wake it up and move on.
# It will handle unregistration.
wentry.assign_data_and_wakeup(data,q)
break
else:
# no waiters need more data. save it for later.
# also skip the usual size check; we are in sync land and we can't raise WouldBlock.
q.ready_data.appendleft(data)
def assign_to_waiter(self, q: "DispatcherQueue", data):
if q.ready_data:
# There are no waiters if we haven't delivered previous ready_data
q.ready_data.append(data)
else:
try:
waiters = self.waiters_for_queues[q] # type: list
except KeyError:
# raise QueueNotOpenException("cannot put() to unknown or closed queue")
self.warn_func(f"WARNING: put() on closed queue; throwing away data: {data}")
return
for wentry in waiters: # type: WaiterEntry
if wentry.still_waiting:
# get() will guarantee this data isn't lost.
# Assign it to the task, wake it up and move on.
# It will handle unregistration.
wentry.assign_data_and_wakeup(data,q)
break
else:
# no waiters need more data. save it for later.
q.ready_data.append(data)
class DispatcherQueue:
MAX_EVENT_WAITS = 10000
def __init__(self, dispatcher: Dispatcher):
# when this queue fills up, a get is responsible for triggering this event once the queue has room for more data
self.has_free_slots = hazmat.ParkingLot()
# the actual data on this queue that was put here but remains to be consumed
self.ready_data = collections.deque()
self.max_size = 10
self.dispatcher = dispatcher
def __len__(self):
return len(self.ready_data)
async def put(self, data):
if len(self.ready_data) >= self.max_size:
# have faith we will be resurrected later by a get() or get_nowait()
await self.has_free_slots.park()
self.dispatcher.assign_to_waiter(self, data)
def put_nowait(self, data):
if len(self.ready_data) >= self.max_size:
raise trio.WouldBlock("max size reached")
self.dispatcher.assign_to_waiter(self, data)
async def get(self):
try:
rd = self.ready_data.popleft()
if len(self.ready_data) == self.max_size - 1:
self.has_free_slots.unpark()
return rd
except IndexError:
pass # no data is immediately available. Subcsribe and wait.
return await self.dispatcher.get(self)
def get_nowait(self):
try:
rd = self.ready_data.popleft()
if len(self.ready_data) == self.max_size - 1:
self.has_free_slots.unpark()
return rd
except IndexError:
raise trio.WouldBlock("no ready_data for queue")
def qsize(self):
return len(self.ready_data)
def close(self):
self.dispatcher.close_queue(self)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment