Skip to content

Instantly share code, notes, and snippets.

@Greyvend
Created June 16, 2020 08:56
Show Gist options
  • Save Greyvend/8474814094175559fed215dfd80df603 to your computer and use it in GitHub Desktop.
Save Greyvend/8474814094175559fed215dfd80df603 to your computer and use it in GitHub Desktop.
Addressable Binary Heap
import math
from operator import lt
class Heap:
"""Binary heap, with pluggable comparison function."""
def __init__(self, cmp=lt):
self.len = 0
self.arr = []
self.cmp = cmp
def peek(self):
"""Take a peek at the top of the heap, without touching it.
:return: the top element
"""
if not self.arr:
return
return self.arr[0]
def insert(self, value):
"""Insert new value in the heap. It might be duplicated.
:param value: any kind of value that supports comparison operation
specified during initialization
"""
self.arr.append(value)
self.len += 1
self._pop_up()
def popitem(self):
"""Remove top element from the heap if it exists, do nothing otherwise.
:return: the top element
"""
peak = self.peek()
if not peak:
return
self._switch(0, self.len - 1)
self.arr.pop()
self.len -= 1
self._push_down()
return peak
def __len__(self):
return self.len
def __repr__(self):
return '\n'.join(map(str, self.arr))
def _switch(self, i, j):
"""Switch two heap elements"""
self.arr[i], self.arr[j] = self.arr[j], self.arr[i]
def _pop_up(self, cur=None):
"""One of the two basic heap operations, bring new element up the heap.
:param cur: index of the element in the heap array, last one if not
specified
"""
if self.len < 1:
return
cur = cur or self.len - 1
parent = math.ceil((cur - 2) / 2)
while parent >= 0 and not self.cmp(self.arr[parent], self.arr[cur]):
self._switch(parent, cur)
cur = parent
parent = math.ceil((cur - 2) / 2)
def _push_down(self, cur=None):
"""Another basic heap operations, push the element down the heap.
:param cur: index of the element in the heap array, top one if not
specified
"""
cur = cur or 0
while 2 * cur + 1 < self.len:
child = 2 * cur + 1
if child + 1 < self.len and self.cmp(self.arr[child + 1],
self.arr[child]):
child += 1
if not self.cmp(self.arr[child], self.arr[cur]):
break
self._switch(child, cur)
cur = child
class AddressableHeap(Heap):
"""Addressable heap items should be tuples, first element is the handle
Each element is in the form (h, a1, a2, ..., an). Comparison function by
default will prioritize over the handle, so it should be adjusted.
"""
def __init__(self, **kwargs):
self.addresses = {}
super(AddressableHeap, self).__init__(**kwargs)
def insert(self, value):
"""Change existing handle if it is present, insert otherwise."""
if value[0] in self.addresses:
self._change(value)
else:
self.addresses[value[0]] = self.len
super(AddressableHeap, self).insert(value)
def popitem(self):
peak = super(AddressableHeap, self).popitem()
if peak is not None:
self.addresses.pop(peak[0])
return peak
def _switch(self, i, j):
handle_1 = self.arr[i][0]
handle_2 = self.arr[j][0]
self.addresses[handle_1], self.addresses[handle_2] = j, i
super(AddressableHeap, self)._switch(i, j)
def _change(self, new_value):
"""Change existing element to a new value and restore heap property.
This is basically the main logic of addressable heap. We use addresses
to locate existing element in O(1), change its value in the heap and
restore the heap property by pushing down/popping the element up based
on the new value it has.
"""
addr = self.addresses[new_value[0]]
up = self.cmp(new_value, self.arr[addr])
self.arr[addr] = new_value
if up:
self._pop_up(addr)
else:
self._push_down(addr)
from operator import gt
from heap import Heap, AddressableHeap
def test_heap_min():
min_heap = Heap()
min_heap.insert(3)
min_heap.insert(4)
min_heap.insert(8)
min_heap.insert(9)
min_heap.insert(7)
min_heap.insert(10)
min_heap.insert(9)
min_heap.insert(15)
min_heap.insert(20)
min_heap.insert(13)
assert min_heap.arr == [3, 4, 8, 9, 7, 10, 9, 15, 20, 13]
assert min_heap.peek() == 3
min_heap.insert(2)
assert min_heap.arr == [2, 3, 8, 9, 4, 10, 9, 15, 20, 13, 7]
assert min_heap.peek() == 2
min_heap.popitem()
assert min_heap.arr == [3, 4, 8, 9, 7, 10, 9, 15, 20, 13]
assert min_heap.peek() == 3
min_heap.popitem()
assert min_heap.arr == [4, 7, 8, 9, 13, 10, 9, 15, 20]
assert min_heap.peek() == 4
min_heap.popitem()
assert min_heap.arr == [7, 9, 8, 15, 13, 10, 9, 20]
assert min_heap.peek() == 7
def test_heap_max():
max_heap = Heap(cmp=gt)
max_heap.insert(3)
max_heap.insert(4)
max_heap.insert(8)
max_heap.insert(9)
max_heap.insert(7)
max_heap.insert(10)
max_heap.insert(9)
max_heap.insert(15)
max_heap.insert(20)
max_heap.insert(13)
assert max_heap.arr == [20, 15, 9, 10, 13, 4, 9, 3, 8, 7]
assert max_heap.peek() == 20
max_heap.insert(2)
assert max_heap.arr == [20, 15, 9, 10, 13, 4, 9, 3, 8, 7, 2]
assert max_heap.peek() == 20
max_heap.popitem()
assert max_heap.arr == [15, 13, 9, 10, 7, 4, 9, 3, 8, 2]
assert max_heap.peek() == 15
def test_addressable_heap_small():
min_heap = AddressableHeap(cmp=lambda x, y: x[1] < y[1])
assert min_heap.peek() is None
min_heap.popitem() # check that it doesn't fail
min_heap.insert(('a', 3))
min_heap.popitem()
assert min_heap.arr == []
assert min_heap.addresses == {}
min_heap.insert(('a', 3))
min_heap.insert(('b', 4))
min_heap.popitem()
assert min_heap.arr == [('b', 4)]
assert min_heap.addresses == {'b': 0}
def test_addressable_heap_large():
min_heap = AddressableHeap(cmp=lambda x, y: x[1] < y[1])
min_heap.insert(('a', 3))
min_heap.insert(('b', 4))
min_heap.insert(('c', 8))
min_heap.insert(('d', 9))
min_heap.insert(('e', 7))
assert min_heap.arr == [('a', 3), ('b', 4), ('c', 8), ('d', 9), ('e', 7)]
assert min_heap.peek() == ('a', 3)
# insert new element
min_heap.insert(('f', 2))
assert min_heap.arr == [('f', 2), ('b', 4), ('a', 3), ('d', 9), ('e', 7),
('c', 8)]
assert min_heap.peek() == ('f', 2)
# set smaller value to existing element
min_heap.insert(('b', 1))
assert min_heap.arr == [('b', 1), ('f', 2), ('a', 3), ('d', 9), ('e', 7),
('c', 8)]
assert min_heap.addresses == {'b': 0, 'f': 1, 'a': 2, 'd': 3, 'e': 4,
'c': 5}
assert min_heap.peek() == ('b', 1)
# set larger value to existing element
min_heap.insert(('f', 10))
assert min_heap.arr == [('b', 1), ('e', 7), ('a', 3), ('d', 9), ('f', 10),
('c', 8)]
assert min_heap.addresses == {'b': 0, 'e': 1, 'a': 2, 'd': 3, 'f': 4,
'c': 5}
assert min_heap.peek() == ('b', 1)
# remove some elements
min_heap.popitem()
assert min_heap.arr == [('a', 3), ('e', 7), ('c', 8), ('d', 9), ('f', 10)]
assert min_heap.addresses == {'a': 0, 'e': 1, 'c': 2, 'd': 3, 'f': 4}
assert min_heap.peek() == ('a', 3)
min_heap.popitem()
assert min_heap.arr == [('e', 7), ('d', 9), ('c', 8), ('f', 10)]
assert min_heap.addresses == {'e': 0, 'd': 1, 'c': 2, 'f': 3}
assert min_heap.peek() == ('e', 7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment