Skip to content

Instantly share code, notes, and snippets.

@josephg
Last active March 7, 2024 09:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save josephg/053c7e42ffebf7e91e966fe3b0cdcebc to your computer and use it in GitHub Desktop.
Save josephg/053c7e42ffebf7e91e966fe3b0cdcebc to your computer and use it in GitHub Desktop.
Graph diff
from dataclasses import dataclass
from enum import Enum
import heapq
Version = set[int] # The numbers in this set are indexes into the list of events in the graph.
@dataclass
class GraphEntry:
parents: set[int] # Set of indexes...
id: any
# And the event data.
@dataclass
class DiffResult:
a_only: list[int]
b_only: list[int]
class CausalGraph:
entries: list[GraphEntry]
def diff(self, a: Version, b: Version) -> DiffResult:
class DiffFlag(Enum):
A = 0
B = 1
SHARED = 2
flags: dict[int, DiffFlag] = {}
# We need a max queue, so all entries are negative.
queue: list[int] = []
# number of items in the queue in state `SHARED`
num_shared = 0
def enqueue(lv: int, f: DiffFlag):
nonlocal num_shared, flags
if lv not in flags:
heapq.heappush(queue, -lv)
flags[lv] = f
if f == DiffFlag.SHARED:
num_shared += 1
else:
current_flag = flags[lv]
if f != current_flag and current_flag != DiffFlag.SHARED:
# This is sneaky. If the two types are different they have to be {A,B},
# {A,Shared} or {B,Shared}. In any of those cases the final result is
# Shared. If the current type isn't shared, set it as such.
flags[lv] = DiffFlag.SHARED
num_shared += 1
for v in a:
enqueue(v, DiffFlag.A)
for v in b:
enqueue(v, DiffFlag.B)
a_only: list[int] = []
b_only: list[int] = []
while len(queue) > num_shared:
v = -heapq.heappop(queue)
flag = flags[v]
if flag == DiffFlag.SHARED:
num_shared -= 1
# If there are other heap entries pointing to the same place, consume them.
while queue and queue[0] == -v:
heapq.heappop(queue)
if flag == DiffFlag.SHARED:
num_shared -= 1
if flag != DiffFlag.SHARED:
if flag == DiffFlag.A:
a_only.append(v)
else:
b_only.append(v)
e = self.entries[v]
for p in e.parents:
enqueue(p, flag)
a_only.reverse()
b_only.reverse()
return DiffResult(a_only, b_only)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment