Skip to content

Instantly share code, notes, and snippets.

@yoshrote
Created October 1, 2017 11:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yoshrote/5cff8f0da83081acb75ddc1fb28f9ea4 to your computer and use it in GitHub Desktop.
Save yoshrote/5cff8f0da83081acb75ddc1fb28f9ea4 to your computer and use it in GitHub Desktop.
distributed transaction
import datetime
import json
class DistributedTransaction(object):
TTL = datetime.timedelta(seconds=30)
def __init__(self, cache, key):
self.cache = cache
self.key = key
def add(self, pk, changeset):
"""CAS to add a changeset to an ongoing transaction."""
with self.cache.pipeline() as pipe:
while True:
try:
pipe.watch(self.key)
pending_doc = json.loads(
pipe.hget(self.key, pk) or "{}"
)
pipe.multi()
pending_doc.update(changeset)
pipe.hset(self.key, pk, json.dumps(pending_doc))
pipe.incr("semaphore:{}".format(self.key))
results = pipe.execute()
if results[-1] >= 0:
self.cache.publish("transaction:{}".format(self.key), "done")
break
except WatchError:
continue
def aquire(self):
self.cache.decr("semaphore:{}".format(self.key))
def abort(self):
with self.cache.pipeline() as pipe:
pipe.publish("transaction:{}".format(self.key), "abort")
pipe.delete(self.key)
pipe.execute()
def commit(self, adapter):
for pk, pending_blob in self.cache.hgetall(self.key):
patch = json.loads(pending_blob)
adapter.patch(pk, patch)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment