Created
October 3, 2013 10:17
-
-
Save tomotaka/6807853 to your computer and use it in GitHub Desktop.
cache-enabled counter using MongoDB's '$inc' operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
import time | |
import math | |
__all__ = ('MongoCounterError', 'MongoCounter') | |
class MongoCounterError(Exception): | |
pass | |
class MongoCounter(object): | |
""" | |
counter with MongoDB | |
assumptions: | |
- the counter collection hold multiple counters | |
- doc sample: | |
{'counter_name': 'testcounter', 'value': 0} | |
- 'counter_name' means its counter_name | |
- current counter value is in 'value' | |
usage: | |
counter = MongoCounter( | |
pymongo_connection=pymong.MongoClient(), | |
db_name='mydb', | |
counter_name='mycountercollection', | |
use_gevent=True # if you want to use gevent interval-flusher thread | |
) | |
counter.incr() | |
current_counter = counter.get() | |
""" | |
def __init__(self, pymongo_connection, db_name, counter_name, counter_collection='counters', flush_interval_ms=500, use_gevent=False): | |
self.conn = pymongo_connection | |
self.mongo_db_name = db_name | |
self.mongo_collection_name = counter_collection | |
self.mongo_collection = getattr(self.conn, self.mongo_db_name)[self.mongo_collection_name] | |
self.counter_name = counter_name | |
self.flush_interval_ms = flush_interval_ms | |
self.last_flush_time_ms = self._now_time_ms() | |
self.cache_count = 0 | |
self._last_get_count = None | |
self._last_count_fetch_time = None | |
self._make_sure_exist() | |
if use_gevent: | |
import gevent | |
self._flush_gthread_spawned = gevent.spawn(self.__flush_gthread) | |
def __flush_gthread(self): | |
import gevent | |
while True: | |
print 'sleeping@gthread' | |
gevent.sleep(5.0) | |
self.flush() | |
print 'flush@gthread' | |
def _now_time_ms(self): | |
return math.floor(time.time()*1000) | |
def incr(self, incr_amount=1, cache=True): | |
"""increment counter | |
flush timing has not come => update cache counter | |
flush timing exceeded => flush to mongo | |
""" | |
self.cache_count += incr_amount | |
if not cache or (self.last_flush_time_ms + self.flush_interval_ms < self._now_time_ms()): | |
# apply | |
self.flush() | |
def _incr_force(self, incr_amount): | |
"""increment counter value in MongoDB""" | |
self._make_sure_exist() | |
condition = {'counter_name': self.counter_name} | |
counter_update = {'$inc': {'value': incr_amount}} | |
self.mongo_collection.update(condition, counter_update) | |
def flush(self): | |
"""flush the cached counter difference""" | |
if self.cache_count != 0: | |
self._incr_force(self.cache_count) | |
self.cache_count = 0 | |
self.last_flush_time_ms = self._now_time_ms() | |
def get(self, reload_if_older_than_ms=500, force=False): | |
"""get current counter value""" | |
if self._last_count_fetch_time is None or force: | |
# forced, or not fetched yet | |
return self._get_count_force() | |
elif time.time() < self._last_count_fetch_time + (reload_if_older_than_ms / 1000.0): | |
# fetched recently, not expired | |
return self._last_get_count | |
else: | |
# we have cache count but it has already expired | |
return self._get_count_force() | |
def _get_count_force(self): | |
"""retrieve counter from mongo and update cache counter and fetched time""" | |
self._last_get_count = self._get_count_from_mongo() | |
self._last_count_fetch_time = time.time() | |
return self._last_get_count | |
def _get_count_from_mongo(self): | |
"""query to mongodb and retrieve counter value for the counter with name counter_name""" | |
self._make_sure_exist() | |
find_result = self.mongo_collection.find({'counter_name': self.counter_name}) | |
doc = find_result[0] | |
return doc['value'] | |
def _make_sure_exist(self): | |
"""make sure the counter with name counter_name exist""" | |
if hasattr(self, '_exist') and self._exist: | |
# already checked for this counter_name | |
return | |
find_result = self.mongo_collection.find({'counter_name': self.counter_name}) | |
if find_result.count() == 1: | |
# already exist | |
self._exist = True | |
else: | |
# not exist: create record | |
self.mongo_collection.insert({'counter_name': self.counter_name, 'value': 0}) | |
self._exist = True | |
@property | |
def last_count(self): | |
"""counter value last""" | |
if self._last_get_count is None: | |
raise MongoCounterError('you have not fetched yet') | |
return self._last_get_count | |
@property | |
def last_count_fetch_time(self): | |
if self._last_count_fetch_time is None: | |
raise MongoCounterError('you have not fetched yet') | |
return self._last_count_fetch_time |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment