Skip to content

Instantly share code, notes, and snippets.

Created January 10, 2013 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/4504409 to your computer and use it in GitHub Desktop.
Save anonymous/4504409 to your computer and use it in GitHub Desktop.
""" Demonstrating using the experimental dogpile.cache async_creator_factory
with a redis queue.
You'll need these requirements.
$ pip install retask redis python-memcached
You'll also need a running redis instance and a running memcached instance.
"""
from __future__ import print_function
from dogpile.cache import make_region
import time
import retask.queue
import retask.task
import json
from my_module import log
from my_module import my_func
# Initialize an outgoing redis queue right off the bat.
queue = retask.queue.Queue("my_tasks")
queue.connect()
def async_creator_factory(cache, somekey, creator):
""" Returns an async_creator callable for use by dogpile.core:Lock """
def async_creator(mutex):
""" Used by dogpile.core:Lock when appropriate.
Instead of directly computing the value, this instead adds a task to
a redis queue with instructions for a worker on how to import and
invoke function that we want.
It also assumes the cache is backed by memcached and so provides the
worker both with the cache key for the new value as well as the
memcached key for the distributed mutex (so it can be released later).
"""
fn = dict(module=creator.__module__, name=creator.__name__)
freevar_dict = dict(zip(
creator.func_code.co_freevars,
[c.cell_contents for c in (creator.func_closure or [])]
))
task = retask.task.Task(json.dumps(dict(
fn=fn,
arg=freevar_dict['arg'],
kw=freevar_dict['kw'],
mutex_key=mutex.key,
cache_key=somekey,
memcached_addrs=cache.backend.url,
)))
# fire-and-forget
queue.enqueue(task)
return async_creator
if __name__ == '__main__':
region = make_region(
async_creator_factory=async_creator_factory,
).configure(
'dogpile.cache.memcached',
expiration_time=5,
arguments={
'url': '127.0.0.1:11211',
'distributed_lock': True,
}
)
# Mark the function imported from my_module as to-be-cached.
my_func = region.cache_on_arguments()(my_func)
while True:
raw_input("Press enter to invoke the cached function")
log("Calling it now...")
value = my_func()
log("Value was:", value)
""" This is a long-running worker process that generates values asychronously
for dogpile.cache.
"""
import json
import time
import retask.queue
import memcache
import dogpile.cache.api
import dogpile.cache.region
# Initialize an incoming redis queue right off the bat.
queue = retask.queue.Queue("my_tasks")
queue.connect()
while True:
if queue.length == 0:
#print("No tasks found in the queue. Sleeping for 2 seconds.")
time.sleep(2)
continue
print "Picking up a job."
task = queue.dequeue()
data = json.loads(task.data)
mc = memcache.Client(data['memcached_addrs'])
try:
module = __import__(data['fn']['module'])
fn = getattr(module, data['fn']['name'])
print "Calling {module}:{name}(*{arg}, **{kw})".format(
arg=data['arg'], kw=data['kw'], **data['fn'])
value = fn(*data['arg'], **data['kw'])
value = dogpile.cache.api.CachedValue(value, {
"ct": time.time(),
"v": dogpile.cache.region.value_version,
})
mc.set(str(data['cache_key']), value)
finally:
# Release the kraken!
mc.delete(str(data['mutex_key']))
print "-" * 30
from __future__ import print_function
import time
import random
def my_func():
""" This is my computationally expensive function. """
log("Really calling func. sleeping.")
time.sleep(5)
value = random.randint(0, 100)
log("Waking. Cache gets new val", value)
return value
def log(*args):
print(*args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment