Skip to content

Instantly share code, notes, and snippets.

@yeus
Created February 23, 2022 21:18
Show Gist options
  • Save yeus/1d83f40e14633532c7cdda7dff99cb33 to your computer and use it in GitHub Desktop.
Save yeus/1d83f40e14633532c7cdda7dff99cb33 to your computer and use it in GitHub Desktop.
UniqueDynamicPriorityQueue - dynamically modifiable unique priority task queue
class UniqueDynamicPriorityQueue(asyncio.Queue):
"""A subclass of Queue; retrieves entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
Entries can only be unique and when the same data gets pushed twice,
it will replace the old one.
Entries can be modified hence this this a "dynamic" priority queue.
This class is based on a priority queue implemenation from here:
https://docs.python.org/3/library/heapq.html#priority-queue-implementation-notes
"""
def _init(self, maxsize):
self._queue = [] # list of entries arranged in a heap
self.entry_finder = {} # mapping of tasks to entries
self.REMOVED = '<removed-task>' # placeholder for a removed task
self.counter = itertools.count() # unique sequence count
self.processed_item = None
def _put(self, item):
self.add_task(priority=item[0], item=item[1])
def _get(self):
if self.processed_item: # if we left an item unfinished...
# put it back in the queue...
self.put(self.processed_item)
self.processed_item = None
item = self.pop_task()
self.processed_item = item
return item
def get_item_priority(self, item):
return self.entry_finder[item][0]
def add_task(self, priority, item):
'Add a new task or update the priority of an existing task'
if item in self.entry_finder:
self.remove_task(item)
count = next(self.counter)
entry = [priority, count, item]
self.entry_finder[item] = entry
heapq.heappush(self._queue, entry)
def remove_task(self, item):
'Mark an existing task as REMOVED. Raise KeyError if not found.'
entry = self.entry_finder.pop(item)
# add removed marker to item data so that we can remove it at some point...
entry[-1] = self.REMOVED
def pop_task(self):
'Remove and return the lowest priority task. Raise KeyError if empty.'
while self._queue:
priority, count, item = heapq.heappop(self._queue)
if item is not self.REMOVED:
del self.entry_finder[item]
return item
raise KeyError('pop from an empty priority queue')
def task_done(self):
super().task_done()
self.remove_task(self.processed_item)
self.processed_item = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment