Skip to content

Instantly share code, notes, and snippets.

@billy1380
Last active June 27, 2017 16:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save billy1380/7500cfa630d4a32825ae to your computer and use it in GitHub Desktop.
Save billy1380/7500cfa630d4a32825ae to your computer and use it in GitHub Desktop.
fork join counter
# https://www.youtube.com/watch?v=zSDC_TU7rtc
def insert(sum_name, delta):
index = memcache.get('index-' + sum_name)
if index is None:
memcache.add('index-' + sum_name, 1)
index = memcache.get ('index-' + sum_name)
lock = '%s-lock-%d' % (sum_name, index)
writers = memcache.incr(lock, initial_value = 2 ** 16)
if writers < 2 ** 16:
memcache.decr(lock)
return False # Insert fails, try again
work = MyWork(delta=delta, work_index='%s-%d' % (sum_name, knuth_hash(index)))
work.put()
now = time.time()
try:
taskqueue.add(name = '%s-%d-%d' % (sum_name, int(now / 30), index), url='/work', eta=datetime.datetime.utcfromtimestamp(now) + datetime.timedelta(seconds=1))
except taskqueue.TaskAlreadyExistsError:
pass # Fan-in magic
finally:
memcache.decr(lock)
return True
def join(sum_name, index):
# force new writers to use the next index
memcache.incr('index-' + sum_name)
lock = '%s-lock-%d' % (sum_name, index)
memcache.decr(lock, 2 ** 15) # You missed the boat
# busy wait for writers
for i in xrange(20): #timeout after 5s
counter = memcache.get(lock)
if counter is None or int(counter) <= 2 ** 15:
break
time.sleep(0.25)
results = list(MyWork.all().filter('work_index=', '%s-%d' % (sum_name, knuth_hash(index))).order('__key__'))
delta = sum(r.delta for r in results)
def txn():
my_sum = MySum.get_by_key_name(sum_name)
if my_sum is None:
my_sum = MySum(key_name = sum_name, name = sum_name, total = 0)
my_sum.total += delta
my_sum.put()
db.run_in_transaction(txn)
db.delete(results)
@deemson
Copy link

deemson commented Jun 5, 2015

GJ, mate! Really useful. You have a typo in line 44, though. It's "sum(...)" not "sum_name(...)".

@billy1380
Copy link
Author

fixed thanks @deemson

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment