Instantly share code, notes, and snippets.

Embed
What would you like to do?
Speeding up your MongoDB queries with Tornado Mixin
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
from __future__ import unicode_literals
from hashlib import md5
import datetime
import tornado.web
from tornado import gen
from tornado.options import define, options
from handlers import DBMixin
define("cache", default="True", type=bool, help="Enable/Disable the internal cache")
_cache = {}
class DBMixin(object):
@property
def db(self): return self.application.db
class CacheHandler(tornado.web.RequestHandler, DBMixin):
def __init__(self, *args, **kwargs):
super(CacheHandler, self).__init__(*args, **kwargs)
self.FIND_ONE = 1
self.FIND = 2
self.AGGREGATE = 3
@gen.coroutine
def cache(self, type, col, *args, **kwargs):
"""Samples:
products = yield self.cache(self.FIND, 'products', {}, {'_id': 1}, to_list=50, memory=True)
brand = yield self.cache(self.FIND_ONE, 'brands', find_one={'name': 'Acquaflora'}, memory=True)
pipeline = [
{'$match':
{'cats_ids': {'$all': [ ObjectId(category['_id']) ]}}
},
{'$project':
{'products': 1, 'min_price': 1, 'max_price': 1, 'n_products': 1, '_id': 1}
},
{'$sort': sort }, # Sort first to evaluete all prices
{'$skip': self.calculate_page_skip(limit=limit)},
{'$limit': limit}
]
groups = yield self.cache(self.AGGREGATE, 'products_groups', memory=False, pipeline=pipeline)
"""
memory = kwargs.pop('memory', True) # Local Memory
timeout = kwargs.pop('timeout', 60)
sort = kwargs.pop('sort', None)
signature = str(type)+col+str(args)+str(kwargs)
key = md5(signature.encode('utf-8')).hexdigest()
@gen.coroutine
def get_key(key):
if not options.cache:
raise gen.Return(False)
if memory:
if _cache.get(key, False):
raise gen.Return(_cache[key])
else:
raise gen.Return(False)
else:
data = yield self.db['_cache'].find_one({'key': key})
raise gen.Return(data)
@gen.coroutine
def set_key(key, value):
delta = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
if memory:
_cache[key] = {
'd': value,
't': delta
}
else:
yield self.db['_cache'].insert({
'key': key,
'd': value,
't': delta
})
@gen.coroutine
def del_key(key):
if memory:
if _cache.get(key, False): del _cache[key]
else:
yield self.db['_cache'].remove({'key': key})
_key = yield get_key(key)
if _key:
# If the time in the future is still bigger than now
if _key['t'] >= datetime.datetime.now():
raise gen.Return(_key['d'])
else: # Invalid
yield del_key(key)
# otherwise (key not exist)
if type == self.FIND_ONE:
data = yield self.db[col].find_one(*args, **kwargs)
elif type == self.FIND:
if sort:
cursor = self.db[col].find(*args, **kwargs).sort(sort)
else:
cursor = self.db[col].find(*args, **kwargs)
data = yield cursor.to_list(kwargs.pop('to_list', None))
elif type == self.AGGREGATE:
cursor = self.db[col].aggregate(
kwargs.pop('pipeline', []),
*args,
cursor = kwargs.pop('cursor', {}),
**kwargs
)
data = yield cursor.to_list(kwargs.pop('to_list', None))
if options.cache:
# Persist the key
yield set_key(key, data)
raise gen.Return(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment