Skip to content

Instantly share code, notes, and snippets.

@joyrexus
Created May 6, 2014 21:06
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joyrexus/eda5cabd9367daf45628 to your computer and use it in GitHub Desktop.
Save joyrexus/eda5cabd9367daf45628 to your computer and use it in GitHub Desktop.
Priority queue (with delete by label)

A simple priority queue implementation with O(n log n) sorting. The underlying data structure is a binary heap.

This implementation uses a binary heap where each node is less than or equal to its children. Nodes can be anything as long as they're comparable.

Like all priority queues, this implementation efficiently retrieves the minimum element (by comparative value) in the queue. You can also insert elements and delete elements if they are "labeled". (See examples below.)

Usage

from heap import PriorityQueue

q = PriorityQueue([3, 1, 2, 4])

assert q.min == 1                 # get  minimum element
assert q.sort() == [1, 2, 3, 4]   # get sorted list of elements

x = q.shift()                     # shift off minimum element
assert x == 1
assert q.sort() == [2, 3, 4]

Alternatively, you can populate your priority queue with Node instances:

a = Node(label='a', priority=1)

Nodes are just python dicts comparable by their priority key.

from heap import Node, PriorityQueue

a = Node(label='a', msg="boom!", priority=1)
b = Node(label='b', msg="hi", priority=2)
c = Node(label='c', msg="ok", priority=3)
d = Node(label='d', msg="oh", priority=4)

assert a < b < c < d

If you initialize your queue with Node objects containing node.label attributes, you can then delete nodes by label:

q = PriorityQueue([b, c, d])

assert q.min == b
assert q.min.msg == 'hi'
assert q.min.label == 'b'
assert q == [b, c, d]

q.insert(a)
assert q.min == a
assert q.min.msg is 'boom!'
assert q.min.label is 'a'
assert q.sort() == [a, b, c, d]   # get sorted list of elements

min = q.shift()                   # shift off minimum element
assert min == a
assert min.label == 'a'
assert q.sort() == [b, c, d]

assert q.delete('c') == c         # delete a node by `node.label`
assert q.sort() == [b, d]
assert q.min == b

See Also

from collections import deque
def is_iterable(obj):
'''Test if `obj` is iterable.'''
try: iter(obj)
except TypeError: return False
return True
def has_label(obj):
'''Test if `obj` has a "label" attribute.'''
try: return obj.label
except AttributeError: return False
def are_labeled(objs):
'''Test if `objs` all have "label" attributes.'''
return all(is_iterable(obj) and has_label(obj) for obj in objs)
class PriorityQueue:
'''
Queue of elements/nodes ordered by priority.
Implementation uses a binary heap where each node is less than
or equal to its children. Note that nodes can be anything as
long as they're comparable.
If you initialize your queue with node elements that contain
`node.label` attributes, you can then delete nodes by label.
'''
def __init__(self, nodes):
self.size = 0
self.heap = deque([None])
self.labeled = False
for n in nodes: self.insert(n)
if are_labeled(nodes):
self.labeled = True
self.position = {node.label: i+1 for i, node in enumerate(self.heap)
if i > 0}
def __str__(self):
return str(list(self.heap)[1:])
def __eq__(self, other):
return list(self.heap)[1:] == other
def node(self, i):
'''
Return {index, value} of node at index i.
This is used for testing parent/child relations.
'''
return dict(index=i, value=self.heap[i])
def parent(self, child):
'''
Return {index, value} for parent of child node.
'''
i = child['index']
p = i // 2
return self.node(p)
def children(self, parent):
'''
Return list of child nodes for parent.
'''
p = parent['index']
l, r = (p * 2), (p * 2 + 1) # indices of left and right child nodes
if r > self.size:
return [self.node(l)]
else:
return [self.node(l), self.node(r)]
def swap(self, i, j):
'''
Swap the values of nodes at index i and j.
'''
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
if self.labeled:
I, J = self.heap[i], self.heap[j]
self.position[I.label] = i
self.position[J.label] = j
def shift_up(self, i):
'''
Percolate upward the value at index i to restore heap property.
'''
p = i // 2 # index of parent node
while p:
if self.heap[i] < self.heap[p]:
self.swap(i, p) # swap with parent
i = p # new index after swapping with parent
p = p // 2 # new parent index
def shift_down(self, i):
'''
Percolate downward the value at index i to restore heap property.
'''
c = i * 2
while c <= self.size:
c = self.min_child(i)
if self.heap[i] > self.heap[c]:
self.swap(i, c)
i = c # new index after swapping with child
c = c * 2 # new child index
def min_child(self, i):
'''
Return index of minimum child node.
'''
l, r = (i * 2), (i * 2 + 1) # indices of left and right child nodes
if r > self.size:
return l
else:
return l if self.heap[l] < self.heap[r] else r
@property
def min(self):
'''
Return minimum node in heap.
'''
return self.heap[1]
@property
def top(self):
'''
Return top/minimum element in heap.
'''
return self.min
def shift(self):
'''
Shift off top/minimum node in heap.
'''
return self.pop(1)
def pop(self, i):
'''
Pop off node at index `i` in heap.
'''
if self.size == 0: return None
v = self.heap[i] # return specified node
self.swap(self.size, i) # move last element to i
self.heap.pop() # delete last element
self.size -= 1 # decrement size
self.shift_down(i) # percolate top value down if necessary
return v
def delete(self, label):
'''
Pop off node with specified label attribute.
'''
try:
i = self.position[label]
self.position[label] = None
except KeyValueError:
print 'node with label "{}" does not exist'.format(label)
return None
return self.pop(i)
def insert(self, node):
'''
Append `node` to the heap and percolate up
if necessary to maintain heap property.
'''
if has_label(node) and self.labeled:
self.position[node.label] = self.size
self.heap.append(node)
self.size += 1
self.shift_up(self.size)
def sort(self):
'''
Return sorted array of elements in current heap.
'''
sorted = [self.shift() for i in range(self.size)]
self.heap = deque([None] + sorted)
self.size = len(self.heap) - 1
return sorted
class Node(dict):
'''
Nodes are just dicts comparable by their `priority` key.
Your nodes should contain `label` attributes when you're
inserting into a PriorityQueue and you'd like to delete
a node by its label from the underlying heap.
>>> a = Node(dict(label='a', priority=5, msg='hi!'))
'''
def __cmp__(self, other):
'''
should return a negative integer if self < other, zero if
self == other, and positive if self > other.
'''
if self['priority'] < other['priority']:
return -1
elif self['priority'] == other['priority']:
return 0
else:
return 1
def __eq__(self, other):
return self['priority'] == other['priority']
def __getattr__(self, attr):
return self.get(attr, None)
if __name__ == '__main__':
q = PriorityQueue([3, 1, 2, 4])
assert q.min == 1
assert q.sort() == [1, 2, 3, 4]
x = q.shift()
assert x == 1
assert q.sort() == [2, 3, 4]
a = Node(label='a', msg="boom!", priority=1)
b = Node(label='b', msg="hi", priority=2)
c = Node(label='c', msg="ok", priority=3)
d = Node(label='d', msg="oh", priority=4)
e = Node(msg="no", priority=5)
assert a < b < c < d
q = PriorityQueue([b, c, d])
assert q.top == b
assert q.top.msg == 'hi'
assert q == [b, c, d]
q.insert(a)
assert q.sort() == [a, b, c, d]
from heap import Node, PriorityQueue
q = PriorityQueue([5, 4, 3])
def test_basic():
assert q.min == 3
assert q.size == 3
assert q == [3, 5, 4]
def test_insert():
'''
Testing `insert` method, which inserts a new element into the queue
and reorders the underlying MinHeap if necessary.
'''
q.insert(2)
assert q.min == 2
assert q.size == 4
assert q == [2, 3, 4, 5]
q.insert(1)
assert q.min == 1
assert q.size == 5
assert q == [1, 2, 4, 5, 3]
def test_relations():
'''
Testing parent and child relations among elements.
'''
l = q.node(4) # node at index 4
r = q.node(5) # node at index 5
p = q.node(2) # parent of child nodes l and r
assert l == {'index': 4, 'value': 5}
assert r == {'index': 5, 'value': 3}
assert p == {'index': 2, 'value': 2}
assert q.parent(l) == p
assert q.parent(r) == p
assert q.children(p) == [l, r]
assert q.children(p) == [l, r]
def test_shift():
'''
Testing the `shift` method, which shifts off min
element and rearranges the underlying MinHeap if
necessary to preserve the min-heap property.
'''
assert q.shift() == 1
assert q == [2, 3, 4, 5]
c = q.node(4)
assert c == {'index': 4, 'value': 5}
p = q.parent(c)
assert p == {'index': 2, 'value': 3}
assert q.children(p) == [c]
def test_pop():
'''
Testing the `pop` method, which pops off nodes at a
specified index and rearranges the underlying MinHeap
if necessary to preserve the min-heap property.
'''
assert q == [2, 3, 4, 5]
assert q.pop(2)
assert q == [2, 5, 4]
def test_sort():
'''
Test sorting (heap sort) of priority queues.
'''
q = PriorityQueue([5, 1, 2, 4, 6, 3])
assert q == [1, 4, 2, 5, 6, 3]
assert q.sort() == [1, 2, 3, 4, 5, 6]
def test_nodes():
'''
Test node object comparability.
'''
a = Node(msg="hi", priority=1)
b = Node(msg="ok", priority=2)
c = Node(msg="oh", priority=2)
assert a < b
assert a <= b
assert not a == b
assert not a > b
assert not a >= b
assert not b > c
assert b >= c
assert b == c
assert a.msg is 'hi'
x = Node(label='x', msg="boom!", priority=1)
assert x.label == 'x'
def test_node_heaping():
'''
Test priority queues with nodes as elements.
'''
a = Node(label='a', msg="boom!", priority=1)
b = Node(label='b', msg="hi", priority=2)
c = Node(label='c', msg="ok", priority=3)
d = Node(label='d', msg="oh", priority=4)
q = PriorityQueue([b, c, d])
assert q.min == b
assert q.min.msg == 'hi'
assert q.min.label == 'b'
assert q == [b, c, d]
q.insert(a)
assert q.min == a
assert q.min.msg is 'boom!'
assert q.min.label == 'a'
assert q == [a, b, d, c]
assert q.delete('c') == c
assert q.sort() == [a, b, d]
assert q.min == a
assert q.min.label == 'a'
min = q.shift()
assert min == a
assert min.label == 'a'
assert q.sort() == [b, d]
assert q.min == b
assert q.min.label == 'b'
q = PriorityQueue([d, c, b, a])
assert [a, b, c, d] == q.sort()
assert [a, b, c, d] == [q.shift() for x in range(q.size)]
assert q.size == 0
assert q == []
from itertools import permutations
nodes = [a, b, c, d]
for perm in permutations(nodes):
q = PriorityQueue(perm)
assert [a, b, c, d] == q.sort()
assert [a, b, c, d] == [q.shift() for x in range(q.size)]
assert q.size == 0
assert q == []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment