Skip to content

Instantly share code, notes, and snippets.

@macieksk
Last active August 5, 2023 02:54
Show Gist options
  • Save macieksk/9743413 to your computer and use it in GitHub Desktop.
Save macieksk/9743413 to your computer and use it in GitHub Desktop.
Unique Priority Queue based on the PriorityQueue class. These priority queues accept tuples as input, where first component is a priority, second component is a key, which is used to identify unique values, third and all further component(s) is an (optional) value.
# -*- coding: utf-8 -*-
#Unique priority queues in Python
#@author: "Maciek Sykulski"<macieksk@gmail.com>
#@blog: http://breakthecircles.blogspot.com/2013/10/unique-priority-queues-in-python.html
from Queue import PriorityQueue
import heapq
class UniquePriorityQueue(PriorityQueue):
def _init(self, maxsize):
#print 'init'
PriorityQueue._init(self, maxsize)
self.values = set()
def _put(self, item, heappush=heapq.heappush):
#print 'put',item
if item[1] not in self.values:
#print 'uniq',item[1]
self.values.add(item[1])
PriorityQueue._put(self, item, heappush)
else:
#print 'dupe',item[1]
pass
def _get(self, heappop=heapq.heappop):
#print 'get'
item = PriorityQueue._get(self, heappop)
#print 'got',item
self.values.remove(item[1])
return item
class UniquePriorityQueueWithReplace(PriorityQueue):
def _init(self, maxsize):
PriorityQueue._init(self, maxsize)
self.values = dict()
self.size_diff = 0
def _put(self, item, heappush=heapq.heappush):
#print "UQPUT:",item
if item[1] not in self.values:
#print 'uniq',item[1]
self.values[item[1]]= [1,1,True]
PriorityQueue._put(self, (item,1), heappush)
else:
validity = self.values[item[1]]
validity[0] += 1 #Number of the valid entry
validity[1] += 1 #Total number of entries
if validity[2]: #Is this a replace move?
self.size_diff += 1
validity[2] = True
PriorityQueue._put(self, (item,validity[0]), heappush)
def _get(self, heappop=heapq.heappop):
while True:
item,i = PriorityQueue._get(self, heappop)
validity = self.values[item[1]]
#print "UQGET_TRY:",item
if validity[1] <= 1:
del self.values[item[1]]
else:
validity[1] -= 1 #Reduce the count
if i == validity[0]:
#print "UQGET_RET:",item
validity[2]=False
return item
else:
self.size_diff -= 1
def _qsize(self,len=len):
return len(self.queue)-self.size_diff
def task_done(self):
"""Changed for proper size estimation
"""
self.all_tasks_done.acquire()
try:
#Here
unfinished = self.unfinished_tasks-self.size_diff - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
#And here
self.unfinished_tasks = unfinished+self.size_diff
finally:
self.all_tasks_done.release()
def test_uniquequeue():
for U in (UniquePriorityQueue, UniquePriorityQueueWithReplace):
u = U()
try:
u.put((0.2, 'foo'))
u.put((0.1, 'foo'))
item = u.get_nowait()
u.put((0.2, 'foo'))
item = u.get_nowait()
except:
assert(False)
u.put((0.2, 'foo'))
u.put((0.3, 'bar'))
u.put((0.1, 'baz'))
u.put((0.5, 'foo'))
u.put((0.4, 'foo'))
i = 0
l = []
while not u.empty():
item = u.get_nowait()
i+=1
print item
l.append(item)
print l
assert(len(l)==3)
assert((U==UniquePriorityQueue and l[1][1]=='foo') or
(U==UniquePriorityQueueWithReplace and l[2][1]=='foo'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment