Skip to content

Instantly share code, notes, and snippets.

@bmwalters
Created October 9, 2021 22:45
Show Gist options
  • Save bmwalters/668e19cbbc068d386331e7d36b040482 to your computer and use it in GitHub Desktop.
Save bmwalters/668e19cbbc068d386331e7d36b040482 to your computer and use it in GitHub Desktop.
First attempt at a skip list
from typing import List, Optional
import random
class Node:
value: str
timestamp: int
next: Optional['Node']
lower: Optional['Node']
def __init__(self, value: str, timestamp: int):
self.value = value
self.timestamp = timestamp
self.next = None
self.lower = None
class SkipList:
def __init__(self, P: int, value: str, timestamp: int):
assert P > 0
self.P = P
self.head = Node(value, timestamp)
current = self.head
for _ in range(P - 1):
new = Node(value, timestamp)
current.lower = new
current = new
def insert(self, value: str, timestamp: int) -> None:
# helper function: propagate upwards
def do_insert(last_visited, current, value, timestamp):
new = Node(value, timestamp)
new.next = current.next
current.next = new
# propagate upwards probabilistically
for i in range(self.P - 2, -1, -1):
if not random.choice((True, False)):
break
upper_new = Node(value, timestamp)
upper_new.next = last_visited[i].next
last_visited[i].next = upper_new
upper_new.lower = new
new = upper_new
if timestamp < self.head.timestamp:
old_head_value = self.head.value
old_head_timestamp = self.head.timestamp
# update special head nodes
current = self.head
level = 0
heads: List[Optional[Node]] = [None for _ in range(self.P)]
while current is not None:
heads[level] = current
current.value = value
current.timestamp = timestamp
current = current.lower
level += 1
# re-insert old head node
do_insert(heads, heads[len(heads) - 1], old_head_value, old_head_timestamp)
else:
current = self.head
level = 0
last_visited: List[Optional[Node]] = [None for _ in range(self.P)]
# at this point we know timestamp >= current.timestamp
while True:
if current.next is None or timestamp < current.next.timestamp:
if current.lower is None:
do_insert(last_visited, current, value, timestamp)
break
else:
last_visited[level] = current
current = current.lower
level += 1
else:
current = current.next
def query(self, timestamp: int) -> Optional[str]:
current = self.head
while True:
if current.timestamp <= timestamp:
if current.next is None or current.next.timestamp > timestamp:
if current.lower is None:
return current.value
else:
current = current.lower
else:
current = current.next
else:
return None
def __str__(self) -> str:
out = ""
head = self.head
while head is not None:
current = head
while current is not None:
out += str(current.timestamp) + " "
current = current.next
head = head.lower
out += "\n"
return out
if __name__ == "__main__": pass
if __name__ == "__main__":
ns = iter([16, 1, 15, 3, 9, 8, 2, 20])
fn = next(ns)
print("inserting", fn)
l = SkipList(4, str(fn), fn)
for n in ns:
print("inserting", n)
l.insert(str(n), n)
print(str(l))
for i in range(21):
print(i, l.query(i))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment