Skip to content

Instantly share code, notes, and snippets.

@dound
Created May 5, 2010 20:45
Show Gist options
  • Save dound/391399 to your computer and use it in GitHub Desktop.
Save dound/391399 to your computer and use it in GitHub Desktop.
application: dummy-for-nose-gae
version: test
runtime: python
api_version: 1
handlers:
- url: /.*
script: request.py
"""A high performance counter without sharding.
Original Version: http://appengine-cookbook.appspot.com/recipe/high-concurrency-counters-without-sharding
Improvements:
1) Off by one?: The deferred memcache.decr() subtracts count where count is
the current memcache value **plus 1**. This means that the memcache value
will actually be decremented 1 too much. This will result in a missed count
if the counter is incremented between the time the memcached.decr() task is
enqueued and when it actually runs (otherwise it isn't a problem since the
value won't decrease below 0).
2) memcache.decr() doesn't need to be a transactional task. memcache.decr()
could still "fail" (it returns None to indicate failure, but this won't
cause the task to be retried).
3) The increment amount can now be tweaked (the default is still 1).
4) CounterModel is explicitly defined.
5) Replaced db.get with Model.get_by_key_name since key is a str not db.Key.
Limitations:
1) Like the original code, this may undercount if memcache evicts a counter
which hasn't been persisted to the db (e.g., a spike it traffic followed by
no traffic). It may also undercount if memcache.incr() fails.
2) This may double-count if a memcache.decr() fails. But this event seems
very unlikely. If it does happen, it will be logged.
3) To get the count, you must fetch it from the datastore (the memcache
counter only has the *change* to the datastore's value).
# initialize the empty counter in the datastore
>>> time = __import__('time')
>>> key = "test"
>>> update_interval = 1 # for testing purposes, flush to datastore after 1sec
# increment the counter (this will go to the datastore)
>>> incrementCounter(key, update_interval=update_interval)
>>> memcache.get("counter_val:%s" % key) is None
True
>>> CounterModel.get_by_key_name(key).counter
1L
# increment it some more (won't go to the datastore: update_interval hasn't passed)
>>> incrementCounter(key, update_interval=update_interval)
>>> incrementCounter(key, delta=5, update_interval=update_interval)
>>> memcache.get("counter_val:%s" % key)
'6'
>>> CounterModel.get_by_key_name(key).counter
1L
# wait for the update_interval to expire
>>> time.sleep(1.0)
# this should go to the datastore
>>> incrementCounter(key, update_interval=update_interval)
>>> memcache.get("counter_val:%s" % key)
'0'
>>> CounterModel.get_by_key_name(key).counter
8L
# increment it some more (won't go to the datastore: update_interval hasn't passed)
>>> incrementCounter(key, update_interval=update_interval)
>>> memcache.get("counter_val:%s" % key)
'1'
>>> CounterModel.get_by_key_name(key).counter
8L
# simulate the memcache entry being evicted => will undercount
>>> memcache.flush_all()
True
>>> memcache.get("counter_val:%s" % key) is None
True
>>> CounterModel.get_by_key_name(key).counter
8L
# simulate memcache.decr() failing => will double count
>>> incrementCounter(key, update_interval=update_interval) # goes to db
>>> incrementCounter(key, update_interval=update_interval) # memcache only
>>> time.sleep(1.0)
>>> memcache.decr_real = memcache.decr
>>> memcache.decr = lambda key, delta : None
>>> incrementCounter(key, update_interval=update_interval) # to db, but decr fails
>>> memcache.decr = memcache.decr_real
# memcache.decr() failed, but the db was updated => next update will double-count hits between this db write and prev db write
>>> memcache.get("counter_val:%s" % key)
'1'
>>> CounterModel.get_by_key_name(key).counter
11L
"""
import logging
from google.appengine.api import memcache
from google.appengine.ext import db
class CounterModel(db.Model):
counter = db.IntegerProperty()
def incrementCounter(key, delta=1, update_interval=10):
"""Increments a memcached counter.
Args:
key: The key of a datastore entity that contains the counter.
delta: Non-negative integer value (int or long) to increment key by, defaulting to 1.
update_interval: Minimum interval between updates.
"""
lock_key = "counter_lock:%s" % (key,)
count_key = "counter_val:%s" % (key,)
if memcache.add(lock_key, None, time=update_interval):
# Time to update the DB
prev_count = int(memcache.get(count_key) or 0)
new_count = prev_count + delta
def tx():
entity = CounterModel.get_by_key_name(key)
if not entity:
entity = CounterModel(key_name=key, counter=0)
entity.counter += new_count
entity.put()
try:
db.run_in_transaction(tx)
if prev_count>0 and memcache.decr(count_key, delta=prev_count) is None:
logging.warn("counter %s could not be decremented (will double-count): %d" % (key, prev_count))
except Exception, e:
# db failed to update: we'll try again later; just add delta to memcache like usual for now
memcache.incr(count_key, delta, initial_value=0)
else:
# Just update memcache
memcache.incr(count_key, delta, initial_value=0)
"""A high performance multi-counter without sharding.
Adapted from code I left in a comment on http://appengine-cookbook.appspot.com/recipe/high-concurrency-counters-without-sharding
Differences from the normal counter:
1) Stores 4 counters together.
2) The 4 counters are atomically updated together.
Limitations: same as gae_high_performance_counters.py
Note: You can adapt this code to store N counters together - but remember that
you only have 64 bits to pack them into (memcache limitation).
# initialize the empty counter in the datastore
>>> time = __import__('time')
>>> key = "test"
>>> update_interval = 1 # for testing purposes, flush to datastore after 1sec
# increment the counter (this will go to the datastore)
>>> incrementMultiCounter(key, 1, 0, 0, 0, update_interval=update_interval)
>>> memcache.get("ctr_V:%s" % key) is None
True
>>> MultiCounterModel.get_by_key_name(key).a
1L
# increment it some more (won't go to the datastore: update_interval hasn't passed)
>>> incrementMultiCounter(key, 5, 1, 2, 3, update_interval=update_interval)
>>> incrementMultiCounter(key, 1, 1, 1, 1, update_interval=update_interval)
>>> unpack_counts(int(memcache.get("ctr_V:%s" % key)))
(6L, 2L, 3L, 4L)
>>> str(MultiCounterModel.get_by_key_name(key))
'1 0 0 0'
# wait for the update_interval to expire
>>> time.sleep(1.0)
# this should go to the datastore
>>> incrementMultiCounter(key, 1, 2, 3, 4, update_interval=update_interval)
>>> unpack_counts(int(memcache.get("ctr_V:%s" % key)))
(0, 0, 0, 0)
>>> str(MultiCounterModel.get_by_key_name(key))
'8 4 6 8'
# increment it some more (won't go to the datastore: update_interval hasn't passed)
>>> incrementMultiCounter(key, 4, 3, 2, 1, update_interval=update_interval)
>>> unpack_counts(int(memcache.get("ctr_V:%s" % key)))
(4L, 3L, 2L, 1L)
>>> str(MultiCounterModel.get_by_key_name(key))
'8 4 6 8'
# simulate the memcache entry being evicted => will undercount
>>> memcache.flush_all()
True
>>> memcache.get("ctr_V:%s" % key) is None
True
>>> str(MultiCounterModel.get_by_key_name(key))
'8 4 6 8'
# simulate memcache.decr() failing => will double count
>>> incrementMultiCounter(key, 1, 1, 1, 1, update_interval=update_interval) # goes to db
>>> incrementMultiCounter(key, 1, 2, 2, 1, update_interval=update_interval) # memcache only
>>> time.sleep(1.0)
>>> memcache.decr_real = memcache.decr
>>> memcache.decr = lambda key, delta : None
>>> incrementMultiCounter(key, 1, 1, 1, 1, update_interval=update_interval) # to db, but decr fails
>>> memcache.decr = memcache.decr_real
# memcache.decr() failed, but the db was updated => next update will double-count hits between this db write and prev db write
>>> unpack_counts(int(memcache.get("ctr_V:%s" % key)))
(1L, 2L, 2L, 1L)
>>> str(MultiCounterModel.get_by_key_name(key))
'11 8 10 11'
"""
import logging
from google.appengine.api import memcache
from google.appengine.ext import db
def dt_today():
"""Returns the datetime representing today's date at midnight."""
d = datetime.date.today()
return datetime.datetime(d.year, d.month, d.day)
# NOTE: We pack into a 64-bit value because memcache.incr()/decr() works on
# 64-bit values. Any more wouldn't work properly.
def pack_counts(a, b, c, d):
"""Packs a, b, c, d into one 64-bit long (as 4 16-bit ints). Assumes each
is in the range [0, 2**16-1]."""
return (a<<0) + (b<<16) + (c<<32) + (d<<48)
def unpack_counts(t):
"""Unpacks 4 16-bit values from t."""
a = t & 0xFFFF
b = (t >> 16) & 0xFFFF
c = (t >> 32) & 0xFFFF
d = t >> 48
return (a,b,c,d)
class MultiCounterModel(db.Model):
a = db.IntegerProperty()
b = db.IntegerProperty()
c = db.IntegerProperty()
d = db.IntegerProperty()
def __str__(self):
return '%d %d %d %d' % (self.a, self.b, self.c, self.d)
def incrementMultiCounter(key='nil', d_share=0, d_tag=0, d_claim=0, d_penalty=0, update_interval=10):
"""Increments the memcache counter for the specified cumulative contribution
for the specified race and/or user.
Args:
race_id: the race this contribution is for
user_id: the user this contribution is for
delta: Non-negative integer value (int or long) to increment key by, defaulting to 1.
update_interval: Minimum interval between updates.
"""
#today = dt_today()
#key = "%s:%s_%s" % (today.strftime('%Y-%m-%d'), race_id, user_id)
lock_key = "ctr_L:%s" % (key,)
counts_key = "ctr_V:%s" % (key,)
delta = pack_counts(d_share, d_tag, d_claim, d_penalty)
if memcache.add(lock_key, None, time=update_interval):
# time to update the DB
prev_packed_counts = int(memcache.get(counts_key) or 0)
new_counts = unpack_counts(prev_packed_counts + delta)
def tx():
entity = MultiCounterModel.get_by_key_name(key)
if entity is None:
entity = MultiCounterModel(key_name=key, a=0, b=0, c=0, d=0)
entity.a += new_counts[0]
entity.b += new_counts[1]
entity.c += new_counts[2]
entity.d += new_counts[3]
entity.put()
try:
db.run_in_transaction(tx)
if prev_packed_counts>0 and memcache.decr(counts_key, delta=prev_packed_counts) is None:
logging.warn("counter %s could not be decremented (will double-count): %s" % (key, unpack_counts(prev_packed_counts)))
except db.Error:
# db failed to update: we'll try again later; just add delta to memcache like usual for now
memcache.incr(counts_key, delta, initial_value=0)
else:
# Just update memcache
memcache.incr(counts_key, delta, initial_value=0)
@dound
Copy link
Author

dound commented May 5, 2010

Commit 8b1063 is the revised recipe by me. It includes a doctest which can be run with nosegae.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment