Instantly share code, notes, and snippets.

What would you like to do?
Speeding up your MongoDB queries with Tornado Mixin
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
from __future__ import unicode_literals
from hashlib import md5
import datetime
import tornado.web
from tornado import gen
from tornado.options import define, options
from handlers import DBMixin
define("cache", default="True", type=bool, help="Enable/Disable the internal cache")
_cache = {}
class DBMixin(object):
def db(self): return self.application.db
class CacheHandler(tornado.web.RequestHandler, DBMixin):
def __init__(self, *args, **kwargs):
super(CacheHandler, self).__init__(*args, **kwargs)
self.FIND_ONE = 1
self.FIND = 2
self.AGGREGATE = 3
def cache(self, type, col, *args, **kwargs):
products = yield self.cache(self.FIND, 'products', {}, {'_id': 1}, to_list=50, memory=True)
brand = yield self.cache(self.FIND_ONE, 'brands', find_one={'name': 'Acquaflora'}, memory=True)
pipeline = [
{'cats_ids': {'$all': [ ObjectId(category['_id']) ]}}
{'products': 1, 'min_price': 1, 'max_price': 1, 'n_products': 1, '_id': 1}
{'$sort': sort }, # Sort first to evaluete all prices
{'$skip': self.calculate_page_skip(limit=limit)},
{'$limit': limit}
groups = yield self.cache(self.AGGREGATE, 'products_groups', memory=False, pipeline=pipeline)
memory = kwargs.pop('memory', True) # Local Memory
timeout = kwargs.pop('timeout', 60)
sort = kwargs.pop('sort', None)
signature = str(type)+col+str(args)+str(kwargs)
key = md5(signature.encode('utf-8')).hexdigest()
def get_key(key):
if not options.cache:
raise gen.Return(False)
if memory:
if _cache.get(key, False):
raise gen.Return(_cache[key])
raise gen.Return(False)
data = yield self.db['_cache'].find_one({'key': key})
raise gen.Return(data)
def set_key(key, value):
delta = + datetime.timedelta(seconds=timeout)
if memory:
_cache[key] = {
'd': value,
't': delta
yield self.db['_cache'].insert({
'key': key,
'd': value,
't': delta
def del_key(key):
if memory:
if _cache.get(key, False): del _cache[key]
yield self.db['_cache'].remove({'key': key})
_key = yield get_key(key)
if _key:
# If the time in the future is still bigger than now
if _key['t'] >=
raise gen.Return(_key['d'])
else: # Invalid
yield del_key(key)
# otherwise (key not exist)
if type == self.FIND_ONE:
data = yield self.db[col].find_one(*args, **kwargs)
elif type == self.FIND:
if sort:
cursor = self.db[col].find(*args, **kwargs).sort(sort)
cursor = self.db[col].find(*args, **kwargs)
data = yield cursor.to_list(kwargs.pop('to_list', None))
elif type == self.AGGREGATE:
cursor = self.db[col].aggregate(
kwargs.pop('pipeline', []),
cursor = kwargs.pop('cursor', {}),
data = yield cursor.to_list(kwargs.pop('to_list', None))
if options.cache:
# Persist the key
yield set_key(key, data)
raise gen.Return(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment