Skip to content

Instantly share code, notes, and snippets.

@davidscherer
Last active March 22, 2021 13:23
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidscherer/2fb9aa34048c75470fec879df3c53f2a to your computer and use it in GitHub Desktop.
Save davidscherer/2fb9aa34048c75470fec879df3c53f2a to your computer and use it in GitHub Desktop.
Crude document database with changefeeds, for illustrative purposes
import fdb, uuid, json, struct, sys
fdb.api_version(500)
class Collection:
def __init__(self,
name,
indexes):
self.name = name
self.dir = root.create_or_open(db, name)
self.pk = indexes["primary_key"]
self.indexes = {
name: (proj, self.dir.create_or_open(db, "index:"+name))
for name,proj in indexes.items()
if name!="primary_key" }
self.changefeeds = {
name: self.dir.create_or_open(db, "changefeed:"+name)
for name in indexes.keys() }
@fdb.transactional
def insert(self, tr, document):
pk = self.pk(document)
# Check primary key constraint
if tr[ self.dir[pk] ] != None:
raise Exception("Duplicate primary key for " + self.name + ": " + repr(pk))
# Insert into primary key index
tr[ self.dir[pk] ] = json.dumps(document).encode("utf8")
self._update_changefeed(tr, "primary_key", pk, pk)
# Insert into other indexes
for name, (proj, idir) in self.indexes.items():
val = proj(document)
tr[ idir[val][pk] ] = b""
self._update_changefeed(tr, name, val, pk)
@fdb.transactional
def update(self, tr, document):
pk = self.pk(document)
old_doc = self.get(tr, pk)
if old_doc:
# Update non-primary key indexes
for name, (proj, idir) in self.indexes.items():
old_val = proj(old_doc)
new_val = proj(document)
if old_val != new_val:
del tr[ idir[old_val][pk] ]
tr[ idir[new_val][pk] ] = b""
self._update_changefeed(tr, name, old_val, pk) #< Documents no longer qualifying for a changefeed should appear in its log?
self._update_changefeed(tr, name, new_val, pk)
# Update primary key index
tr[ self.dir[pk] ] = json.dumps(document).encode("utf8")
self._update_changefeed(tr, "primary_key", pk, pk)
@fdb.transactional
def get(self, tr, pk):
v = tr[ self.dir[pk] ]
if v==None: return None
return json.loads(v.decode("utf8"))
@fdb.transactional
def index_scan(self, tr, index, value):
scanSubspace = self.indexes[index][1][value]
pks = []
for k,v in tr[ scanSubspace.range() ]:
pk, = scanSubspace.unpack(k)
pks.append(pk)
docs = [ tr[ self.dir[pk] ] for pk in pks ]
return [ json.loads(v.decode("utf8")) for v in docs ]
@fdb.transactional
def changefeed(self, tr, index, value, unique_cfid):
cf = self.changefeeds[index][value]
cflist = cf[1]
cflog = cf[2]
# set up the change feed monitoring transactionally
tr.byte_max(cf, b"")
tr[ cflist[unique_cfid] ] = b""
watch = tr.watch(cf)
# create and call a (generator) function which will
# execute outside the transaction `tr`, watching for changes
# and returning them
def tail_changefeed(db, start_ver, watch):
try:
docs = []
while True:
yield docs
watch.wait()
@fdb.transactional
def read_changefeed(tr):
log = [ fdb.tuple.unpack(v)[0] for k,v in tr[
cflog[fdb.tuple.Versionstamp(struct.pack(">QH",start_ver.wait()+1,0))]:
cflog.range().stop ] ]
log = list(set(log)) # "Squash" updates by PK. Our log doesn't (currently) contain enough info for unsquashed updates since it only stores the PK
docs = [ self.get(tr, pk) for pk in log ]
return docs, tr.get_read_version(), tr.watch( cf )
docs, start_ver, watch = read_changefeed( db )
finally:
# Closing the changefeed happens in a separate transaction
@fdb.transactional
def close_changefeed(tr):
del tr[ cflist[unique_cfid] ]
if list( tr.get_range( cflist, cflist.range().stop, limit=1 ) ) == []:
del tr[ self.changefeeds[index][value] : self.changefeeds[index][value].range().stop ]
close_changefeed(db)
return tail_changefeed(tr.db, tr.get_read_version(), watch)
def _update_changefeed(self, tr, name, val, pk):
# check for and update changefeeds
if tr[ self.changefeeds[name][val] ] != None:
#print("Posting to changefeed", self.name, name, val, pk)
tr[ self.changefeeds[name][val] ] = fdb.tuple.pack( (uuid.uuid4(),) )
tr.set_versionstamped_key(
self.changefeeds[name][val][2].pack_with_versionstamp((fdb.tuple.Versionstamp(),)),
fdb.tuple.pack((pk,)) )
db = fdb.open()
fdb.directory.remove_if_exists( db, "reactive_test" )
root = fdb.directory.create_or_open( db, "reactive_test" )
users = Collection("users",
indexes = {
"primary_key" : lambda doc: doc["id"]
})
posts = Collection("posts",
indexes = {
"primary_key" : lambda doc: doc["id"],
"user" : lambda doc: doc["user_id"]
})
def show_changes(collection, index, value):
it = iter( collection.changefeed(db, index, value, uuid.uuid4()) )
for chs in it:
for ch in chs:
sys.stdout.write("Change to " + collection.name + ": " + str(ch) + "\n")
import threading
users.insert( db, {"id": 100, "name": "Alice", "profile": "public"} )
threading.Thread( target=show_changes, args=(users,"primary_key",100) ).start()
threading.Thread( target=show_changes, args=(posts,"user",100) ).start()
users.insert( db, {"id": 101, "name": "Bob", "profile": "private"} )
posts.insert( db, {"id": 1000, "user_id": 100, "subject": "Hi!"} )
posts.insert( db, {"id": 1001, "user_id": 100, "subject": "Question"} )
users.update( db, {"id": 100, "name": "Alice the Great", "profile":"public"} )
posts.update( db, {"id": 1001, "user_id": 100, "subject": "Question??"} )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment