Skip to content

Instantly share code, notes, and snippets.

@tomotaka
Created October 3, 2013 10:17
Show Gist options
  • Save tomotaka/6807853 to your computer and use it in GitHub Desktop.
Save tomotaka/6807853 to your computer and use it in GitHub Desktop.
cache-enabled counter using MongoDB's '$inc' operator
#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import math
__all__ = ('MongoCounterError', 'MongoCounter')
class MongoCounterError(Exception):
pass
class MongoCounter(object):
"""
counter with MongoDB
assumptions:
- the counter collection hold multiple counters
- doc sample:
{'counter_name': 'testcounter', 'value': 0}
- 'counter_name' means its counter_name
- current counter value is in 'value'
usage:
counter = MongoCounter(
pymongo_connection=pymong.MongoClient(),
db_name='mydb',
counter_name='mycountercollection',
use_gevent=True # if you want to use gevent interval-flusher thread
)
counter.incr()
current_counter = counter.get()
"""
def __init__(self, pymongo_connection, db_name, counter_name, counter_collection='counters', flush_interval_ms=500, use_gevent=False):
self.conn = pymongo_connection
self.mongo_db_name = db_name
self.mongo_collection_name = counter_collection
self.mongo_collection = getattr(self.conn, self.mongo_db_name)[self.mongo_collection_name]
self.counter_name = counter_name
self.flush_interval_ms = flush_interval_ms
self.last_flush_time_ms = self._now_time_ms()
self.cache_count = 0
self._last_get_count = None
self._last_count_fetch_time = None
self._make_sure_exist()
if use_gevent:
import gevent
self._flush_gthread_spawned = gevent.spawn(self.__flush_gthread)
def __flush_gthread(self):
import gevent
while True:
print 'sleeping@gthread'
gevent.sleep(5.0)
self.flush()
print 'flush@gthread'
def _now_time_ms(self):
return math.floor(time.time()*1000)
def incr(self, incr_amount=1, cache=True):
"""increment counter
flush timing has not come => update cache counter
flush timing exceeded => flush to mongo
"""
self.cache_count += incr_amount
if not cache or (self.last_flush_time_ms + self.flush_interval_ms < self._now_time_ms()):
# apply
self.flush()
def _incr_force(self, incr_amount):
"""increment counter value in MongoDB"""
self._make_sure_exist()
condition = {'counter_name': self.counter_name}
counter_update = {'$inc': {'value': incr_amount}}
self.mongo_collection.update(condition, counter_update)
def flush(self):
"""flush the cached counter difference"""
if self.cache_count != 0:
self._incr_force(self.cache_count)
self.cache_count = 0
self.last_flush_time_ms = self._now_time_ms()
def get(self, reload_if_older_than_ms=500, force=False):
"""get current counter value"""
if self._last_count_fetch_time is None or force:
# forced, or not fetched yet
return self._get_count_force()
elif time.time() < self._last_count_fetch_time + (reload_if_older_than_ms / 1000.0):
# fetched recently, not expired
return self._last_get_count
else:
# we have cache count but it has already expired
return self._get_count_force()
def _get_count_force(self):
"""retrieve counter from mongo and update cache counter and fetched time"""
self._last_get_count = self._get_count_from_mongo()
self._last_count_fetch_time = time.time()
return self._last_get_count
def _get_count_from_mongo(self):
"""query to mongodb and retrieve counter value for the counter with name counter_name"""
self._make_sure_exist()
find_result = self.mongo_collection.find({'counter_name': self.counter_name})
doc = find_result[0]
return doc['value']
def _make_sure_exist(self):
"""make sure the counter with name counter_name exist"""
if hasattr(self, '_exist') and self._exist:
# already checked for this counter_name
return
find_result = self.mongo_collection.find({'counter_name': self.counter_name})
if find_result.count() == 1:
# already exist
self._exist = True
else:
# not exist: create record
self.mongo_collection.insert({'counter_name': self.counter_name, 'value': 0})
self._exist = True
@property
def last_count(self):
"""counter value last"""
if self._last_get_count is None:
raise MongoCounterError('you have not fetched yet')
return self._last_get_count
@property
def last_count_fetch_time(self):
if self._last_count_fetch_time is None:
raise MongoCounterError('you have not fetched yet')
return self._last_count_fetch_time
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment