Skip to content

Instantly share code, notes, and snippets.

@ericmoritz
Created March 22, 2011 21:04
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save ericmoritz/882055 to your computer and use it in GitHub Desktop.
Save ericmoritz/882055 to your computer and use it in GitHub Desktop.
A counter stored in Riak that handles conflict resolutions on read
import logging
import riak
log = logging.getLogger(__name__)
class RiakCounter(object):
def __init__(self, bucket, key):
self.bucket = bucket
self.bucket.set_allow_multiples(True)
self.key = key
def add(self, step, obj=None):
if obj is None:
obj = self._get_with_resolution()
data = obj.get_data()
data['last'] = data['last'] + data['step']
data['step'] = step
log.debug(data)
obj.set_data(data)
obj.store()
def reset(self, start=0):
obj = self._get_with_resolution()
data = obj.get_data()
data['last'] = start
data['step'] = 0
log.debug(data)
obj.set_data(data)
obj.store()
def _get_with_resolution(self):
obj = self.bucket.get(self.key)
# If the object doesn't exist, return a new object with
# a value of zero
# If the object has siblings, resolve the conflict
if obj.has_siblings():
first_obj = obj.get_sibling(0)
first = first_obj.get_data()
# For each sibling, apply the steps to the first's steps
for sibling_obj in map(obj.get_sibling,
range(1, obj.get_sibling_count())):
sibling = sibling_obj.get_data()
first['step'] += sibling['step']
# Update the data with the new last and step values
first_obj.set_data(first)
log.debug("300: %s" % (first_obj.get_data()))
return first_obj
elif not obj.exists():
obj = self.bucket.new(self.key, data={'last': 0,
'step': 0})
log.debug("404: %s" % (obj.get_data()))
return obj
else:
log.debug("200: %s" % (obj.get_data()))
return obj
@property
def value(self):
obj = self._get_with_resolution()
data = obj.get_data()
return data['last'] + data['step']
from counter import RiakCounter
import logging
import riak
logging.basicConfig(level=logging.DEBUG)
# Create 3 independent counters
c1 = RiakCounter(riak.RiakClient(port=8098).bucket("test"), "counter")
c2 = RiakCounter(riak.RiakClient(port=8098).bucket("test"), "counter")
c3 = RiakCounter(riak.RiakClient(port=8098).bucket("test"), "counter")
# Counter 1 will increment the counter to three
c1.reset() # 0
c1.add(1) # 1
c1.add(1) # 2
c1.add(1) # 3
assert c1.value == 3, c1.value
# Counter 2 and Counter 3 will increment the counter in parallel
# Split reality by fetching two copies of the some object
o2 = c2.bucket.get(c2.key)
o3 = c3.bucket.get(c3.key)
# Attempt to update the values using that copy, this should break the
# space/time continuum.
c2.add(1, obj=o2) # 4
c3.add(10, obj=o3) # 14
# c1 should get a conflicted value and the conflict should be resolved with the value returned.
assert c1.value == 14, c1.value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment