Skip to content

Instantly share code, notes, and snippets.

@cabiad
Last active December 13, 2015 20:19
Show Gist options
  • Save cabiad/4969474 to your computer and use it in GitHub Desktop.
Save cabiad/4969474 to your computer and use it in GitHub Desktop.
A simple Python2 binary min-heap with a simple heapsort implementation. Also, some fuzz testing and a bunch of unit tests for the heap and its helper methods.
####################################
# Copyright Christopher Abiad, 2013
# All Rights Reserved
####################################
"""Binary min-heap with a simple heapsort implementation.
Heap is stored entirely in an array (Python list) using the property that
for any node at position i, its two children can be stored at 2i and
2i + 1 without any collisions with other nodes.
"""
__author__ = 'Christopher Abiad'
from copy import copy
from math import ceil
class NoSuchNodeException(Exception):
pass
def _verify_heap_property(h):
"""Verify the heap property, namely that every node dominates
its children.
For our min-heap implementation, this means that all parents
must be smaller than both children.
"""
for i in range(1, len(h)):
pi = parent_index(i)
result = h[pi] <= h[i]
msg = "{h}: Child {i} (val {vi}) is not dominated by parent {pi} " \
"(val {vp})".format(h=h, i=i, vi=h[i], pi=pi, vp=h[pi])
if not result:
return result, msg
return True, ''
def parent_index(index):
if index <= 0:
raise NoSuchNodeException("Root node has no parent")
# This behaviour of ceil is Python2 specific
return int(ceil(index / float(2)) - 1)
def young_child_index(index):
return index * 2 + 1
def bubble_up(heap, i):
if i == 0:
return
pi = parent_index(i)
if heap[i] >= heap[pi]:
return
heap[i], heap[pi] = heap[pi], heap[i]
bubble_up(heap, pi)
def bubble_down(heap, i):
yci = young_child_index(i)
if yci >= len(heap):
return
# Given the node and its two children, get the index of the one
# which has the minimum value
if heap[yci] < heap[i]:
min_index = yci
else:
min_index = i
oci = yci + 1 # old child index
# oci might be beyond end of list. Skip it if so
if oci < len(heap):
if heap[oci] < heap[min_index]:
min_index = oci
# If the parent is the minimum node, we are done
if min_index == i:
return
heap[min_index], heap[i] = heap[i], heap[min_index]
bubble_down(heap, min_index)
def make_heap(l):
heap = []
for i, val in enumerate(l):
heap.append(val)
bubble_up(heap, i)
return heap
def make_heap_faster(l):
"""An alternative heap-making approach with better performance.
In a valid heap, leaf nodes, which already satisfy heap property,
start at l[len(l)/2]. We iterate over non-leaf nodes and bubble_down()
each creating and merging valid heaps of increasing size.
A brief discussion of why this approach results in linear-time (O(n))
heap construction follows. A more detailed, and likely clearer,
description is available in Skiena's "Algorithm Design Manual"
on page 115.
On initial analysis, one might determine that since, bubble_down()
is O(log(n)) and we call it n/2 times that this method is also
O(n*log(n)). However, we only actually have to bubble down log(n)
times for the final item. The bottom n/2 items are 'single item heaps'
and need no rebalancing. The next n/4 items could only bubble down
one level. In general, there are 2**(h+1) nodes of height h, so the
cost of building the heap is:
sum from h=0 to floor(log(n)) of:
ceil(n / 2**(h+1)) # Number of nodes at height h
* h # Cost per node
After some rearranging, it can also be shown that this value is smaller
than:
n * sum from h=0 to floor(log(n)) of:
h / 2**h
Though this doesn't work with the standard identity, we can see that
the exponential magnitude of the denominator will eventually cause
the sum to converge to a constant value, leading to an overall linear
time cost.
"""
heap = copy(l)
# (len(l) / 2 - 1) may be tricky to understand. Study cases up to
# len(l) == 8 to see that it always picks the last item that has 1 or more
# children
for i in range((len(l) / 2) - 1, -1, -1):
bubble_down(heap, i)
return heap
def extract_min(heap):
if len(heap) == 1:
return heap.pop()
min = heap[0]
heap[0] = heap.pop()
bubble_down(heap, 0)
return min
def heapsort(l):
heap = make_heap_faster(l)
result = []
while len(heap) > 0:
result.append(extract_min(heap))
return result
def sorted(l):
heap = make_heap_faster(l)
while len(heap) > 0:
yield extract_min(heap)
if __name__ == '__main__':
l = []
from random import randrange
for i in range(10000):
l.append(randrange(1000))
print "Making heap (SLOW: iterate add and bubble up)"
h = make_heap(l)
print "Making heap (SLOW: iterate add and bubble up)...done"
result, _ = _verify_heap_property(h)
assert(result == True)
print "Making heap (SLOW: iterate add and bubble up)...heap verified\n"
print "Making heap (FASTER: bubble down non-leafs)"
hf = make_heap_faster(l)
print "Making heap (FASTER: bubble down non-leafs)...done"
result, _ = _verify_heap_property(hf)
assert(result == True)
print "Making heap (FASTER: bubble down non-leafs)...heap verified\n"
print "Heapsort"
s = heapsort(l)
print "Heapsort...done"
assert(s == list(sorted(l)))
print "Heapsort...verified"
####################################
# Copyright Christopher Abiad, 2013
# All Rights Reserved
####################################
__author__ = 'Christopher Abiad'
import heap
import unittest
class HeapTestCase(unittest.TestCase):
def test_verify_heap_property_valid(self):
h_list = ([1, 3, 2, 6, 4, 5],
[1, 3, 5],
[1],
[])
for h in h_list:
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
def test_verify_heap_property_invalid(self):
h_list = ([2, 3, 5, 6, 4, 1],
[3, 5, 1])
for h in h_list:
result, msg = heap._verify_heap_property(h)
self.assertFalse(result,
'invalid heap {h} returned valid!'.format(h=h))
def test_parent_index(self):
with self.assertRaises(heap.NoSuchNodeException):
heap.parent_index(0)
self.assertEqual(heap.parent_index(1), 0)
self.assertEqual(heap.parent_index(2), 0)
self.assertEqual(heap.parent_index(3), 1)
self.assertEqual(heap.parent_index(4), 1)
self.assertEqual(heap.parent_index(5), 2)
self.assertEqual(heap.parent_index(6), 2)
def test_young_child_index(self):
self.assertEqual(heap.young_child_index(0), 1)
self.assertEqual(heap.young_child_index(1), 3)
self.assertEqual(heap.young_child_index(2), 5)
def test_bubble_up_single_item(self):
h = [1]
heap.bubble_up(h, 0)
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
self.assertEqual(h, [1])
def test_bubble_up_no_op(self):
h = [1, 3, 5]
heap.bubble_up(h, 2)
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
self.assertEqual(h, [1, 3, 5])
def test_bubble_up_single(self):
h = [3, 5, 1]
heap.bubble_up(h, 2)
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
self.assertEqual(h, [1, 5, 3])
def test_bubble_up_with_recursion(self):
h = [2, 3, 5, 6, 4, 1]
heap.bubble_up(h, 5)
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
self.assertEqual(h, [1, 3, 2, 6, 4, 5])
def test_bubble_down_to_young_child(self):
h = [2, 3, 5, 6, 4, 1]
heap.bubble_down(h, 2)
self.assertEqual(h, [2, 3, 1, 6, 4, 5])
def test_bubble_down_to_old_child(self):
h2 = [2, 3, 6, 6, 1]
heap.bubble_down(h2, 1)
self.assertEqual(h2, [2, 1, 6, 6, 3])
def test_make_heap_empty(self):
l = []
h = heap.make_heap(l)
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
self.assertEqual(h, [])
def test_make_heap_faster_empty(self):
l = []
h = heap.make_heap_faster(l)
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
self.assertEqual(h, [])
def test_make_heap(self):
l = [2, 3, 6, 6, 1, 23, 6, 6, 324]
h = heap.make_heap(l)
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
def test_make_heap_faster(self):
l = [6, 4, 5, 5, 2, 3]
h = heap.make_heap_faster(l)
result, msg = heap._verify_heap_property(h)
self.assertTrue(result, msg)
def test_extract_min(self):
h = [1, 3, 5]
self.assertEqual(heap.extract_min(h), 1)
self.assertEqual(h, [3, 5])
def test_heapsort(self):
l = [8, 6, 7, 5, 3, 0, 9]
s = heap.heapsort(l)
self.assertEqual(s, [0, 3, 5, 6, 7, 8, 9])
def test_sorted(self):
l = [9, 6, 7, 11, 11]
s = list(heap.sorted(l))
self.assertEqual(s, [6, 7, 9, 11, 11])
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment