Skip to content

Instantly share code, notes, and snippets.

@LiutongZhou
Last active June 6, 2022 01:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LiutongZhou/57a961aaa07b73e1bfbda0667e858fec to your computer and use it in GitHub Desktop.
Save LiutongZhou/57a961aaa07b73e1bfbda0667e858fec to your computer and use it in GitHub Desktop.
Priority queues
"""MinHeap and MaxHeap (Optimized Implementation)"""
from abc import ABC, abstractmethod
from collections import Counter, UserList
from functools import singledispatchmethod
from heapq import (
_heapify_max,
_heappop_max,
_heapreplace_max,
_siftdown,
_siftdown_max,
_siftup,
_siftup_max,
heapify,
heappop,
heappush,
heappushpop,
heapreplace,
)
from typing import Any, Iterable, Optional, Union, overload
__author__ = "Liutong Zhou"
__all__ = ["MinHeap", "MaxHeap"]
class _BasePriorityQueue(ABC, UserList):
"""Base Class for MinHeap and MaxHeap"""
@abstractmethod
def __init__(self, queue: Optional[Iterable] = None):
super().__init__(queue)
self._to_remove = Counter() # cache items for lazy removal
self.heapify()
@abstractmethod
def heapify(self):
raise NotImplementedError
@abstractmethod
def push(self, element: Any):
raise NotImplementedError
@abstractmethod
def pop(self) -> Any:
raise NotImplementedError
@abstractmethod
def pushpop(self, element: Any) -> Any:
"""Push an element and pop the top element"""
raise NotImplementedError
@abstractmethod
def replace(self, element: Any) -> Any:
"""Pop the top element and push the element"""
raise NotImplementedError
@abstractmethod
def siftup(self, pos: int):
raise NotImplementedError
@abstractmethod
def siftdown(self, start_pos: int, pos: int):
raise NotImplementedError
@singledispatchmethod
def __delitem__(self, i: Union[int, slice]) -> None:
"""Deletes item i or sliced items from the heap"""
raise NotImplementedError("See specific single dispatch implementation")
@__delitem__.register
def _(self, i: int) -> None:
"""Deletes item i from the heap in O(log N) time"""
self.data[i] = self.data[-1]
del self.data[-1]
if i < len(self):
self.siftup(i)
self.siftdown(0, i)
@__delitem__.register
def _(self, i: slice) -> None:
"""Deletes sliced items from the heap in O(k log N) time"""
for index in range(len(self))[i]:
del self[index]
@overload
def __setitem__(self, i: int, o: Any) -> None:
...
@overload
def __setitem__(self, i: slice, o: Iterable[Any]) -> None:
...
@singledispatchmethod
def __setitem__(self, i, o) -> None:
"""Set item(s) o to index (slice) i"""
raise NotImplementedError("See specific single dispatch implementation")
@__setitem__.register
def _(self, i: int, o: Any) -> None:
"""Set item o to self[i] in O(log N) time"""
self.data[i] = o
self.siftup(i)
self.siftdown(0, i)
@__setitem__.register
def _(self, i: slice, o: Iterable[Any]) -> None:
"""Set items o to a slice of self[i]"""
for index, item in zip(range(len(self))[i], o):
self[index] = item
def remove(self, item, lazy=True):
"""Deletes item from heap
If lazy, mark item for deletion in self._to_remove. Items marked for deletion
will not be actually deleted until after pop / pushpop / replace is executed
so deletion can be done in O(log N) time.
If not lazy, deletes item immediately in O(N) time if item exists
otherwise raise IndexError.
Parameters
----------
item : Any
lazy : bool
"""
if not lazy:
index = self.index(item)
del self[index]
else:
self._to_remove.update((item,))
self._lazy_remove()
@abstractmethod
def _lazy_remove(self):
"""Execute lazy remove, the actual cleaning step in O(log N) time"""
_to_remove: Counter = self._to_remove
while self and self[0] in _to_remove:
removed = heappop(self.data)
_to_remove.subtract((removed,))
if _to_remove[removed] == 0:
del _to_remove[removed]
class MinHeap(_BasePriorityQueue):
"""Min Heap -- The min element is always at index 0
All methods maintain min heap invariant
"""
def __init__(self, queue: Optional[Iterable] = None):
"""Initialize a MinHeap from a sequence or iterable"""
super().__init__(queue)
def _lazy_remove(self):
"""Execute lazy remove, the actual cleaning step in O(log N) time"""
super()._lazy_remove()
def heapify(self):
"""Construct the min heap itself in linear time"""
heapify(self.data)
self._lazy_remove()
def push(self, element: Any):
"""Push an element to the MinHeap"""
heappush(self.data, element)
def pop(self) -> Any:
"""Pops the top min element"""
item = heappop(self.data)
self._lazy_remove()
return item
def pushpop(self, element: Any) -> Any:
"""Push an element and pop the minimum element"""
item = heappushpop(self.data, element)
self._lazy_remove()
return item
def replace(self, element: Any) -> Any:
"""Pop the min element and push the new element"""
item = heapreplace(self.data, element)
self._lazy_remove()
return item
def siftup(self, pos: int):
"""Sift the smaller children up, starting from pos till leaf"""
_siftup(self.data, pos)
if pos == 0:
self._lazy_remove()
def siftdown(self, start_pos: int, pos: int):
"""Sift the larger parent down, starting from pos till start_pos"""
_siftdown(self.data, start_pos, pos)
if start_pos == 0:
self._lazy_remove()
class MaxHeap(_BasePriorityQueue):
"""Max Heap -- The max element is always at index 0
All methods maintain max heap invariant
"""
def __init__(self, queue: Optional[Iterable] = None):
"""Initialize a MaxHeap from a sequence or iterable"""
super().__init__(queue)
def _lazy_remove(self):
"""Execute lazy remove, the actual cleaning step in O(log N) time"""
_to_remove = self._to_remove
while self and self[0] in _to_remove:
removed = _heappop_max(self.data)
_to_remove.subtract((removed,))
if _to_remove[removed] == 0:
del _to_remove[removed]
def heapify(self):
"""Construct the max heap itself in linear time"""
_heapify_max(self.data)
self._lazy_remove()
def push(self, element: Any):
"""Push an element to the MaxHeap"""
self.data.append(element)
self.siftdown(0, len(self) - 1)
def pop(self) -> Any:
"""Pops the top max element"""
item = _heappop_max(self.data)
self._lazy_remove()
return item
def pushpop(self, element: Any) -> Any:
"""Push an element and pop the max element"""
if self and self[0] > element:
element, self[0] = self[0], element
self.siftup(0)
return element
def replace(self, element: Any) -> Any:
"""Pop the max element and push the new element"""
item = _heapreplace_max(self.data, element)
self._lazy_remove()
return item
def siftup(self, pos: int):
"""Sift the larger children up, starting from pos till leaf"""
_siftup_max(self.data, pos)
if pos == 0:
self._lazy_remove()
def siftdown(self, start_pos: int, pos: int):
"""Sift the smaller parent down, starting from pos till start_pos"""
_siftdown_max(self.data, start_pos, pos)
if start_pos == 0:
self._lazy_remove()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment