public
Created

  • Download Gist
dogpiletest-redis-main.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
""" Demonstrating using the experimental dogpile.cache async_creator_factory
with a redis queue.
 
You'll need these requirements.
$ pip install retask redis python-memcached
 
You'll also need a running redis instance and a running memcached instance.
"""
 
from __future__ import print_function
 
from dogpile.cache import make_region
import time
import retask.queue
import retask.task
import json
 
from my_module import log
from my_module import my_func
 
# Initialize an outgoing redis queue right off the bat.
queue = retask.queue.Queue("my_tasks")
queue.connect()
 
 
def async_creator_factory(cache, somekey, creator):
""" Returns an async_creator callable for use by dogpile.core:Lock """
 
def async_creator(mutex):
""" Used by dogpile.core:Lock when appropriate.
 
Instead of directly computing the value, this instead adds a task to
a redis queue with instructions for a worker on how to import and
invoke function that we want.
 
It also assumes the cache is backed by memcached and so provides the
worker both with the cache key for the new value as well as the
memcached key for the distributed mutex (so it can be released later).
"""
fn = dict(module=creator.__module__, name=creator.__name__)
freevar_dict = dict(zip(
creator.func_code.co_freevars,
[c.cell_contents for c in (creator.func_closure or [])]
))
task = retask.task.Task(json.dumps(dict(
fn=fn,
arg=freevar_dict['arg'],
kw=freevar_dict['kw'],
mutex_key=mutex.key,
cache_key=somekey,
memcached_addrs=cache.backend.url,
)))
# fire-and-forget
queue.enqueue(task)
 
return async_creator
 
 
if __name__ == '__main__':
region = make_region(
async_creator_factory=async_creator_factory,
).configure(
'dogpile.cache.memcached',
expiration_time=5,
arguments={
'url': '127.0.0.1:11211',
'distributed_lock': True,
}
)
 
# Mark the function imported from my_module as to-be-cached.
my_func = region.cache_on_arguments()(my_func)
 
while True:
raw_input("Press enter to invoke the cached function")
log("Calling it now...")
value = my_func()
log("Value was:", value)
dogpiletest-redis-worker.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
""" This is a long-running worker process that generates values asychronously
for dogpile.cache.
"""
 
import json
import time
import retask.queue
import memcache
 
import dogpile.cache.api
import dogpile.cache.region
 
# Initialize an incoming redis queue right off the bat.
queue = retask.queue.Queue("my_tasks")
queue.connect()
 
while True:
if queue.length == 0:
#print("No tasks found in the queue. Sleeping for 2 seconds.")
time.sleep(2)
continue
 
print "Picking up a job."
 
task = queue.dequeue()
data = json.loads(task.data)
 
mc = memcache.Client(data['memcached_addrs'])
 
try:
module = __import__(data['fn']['module'])
fn = getattr(module, data['fn']['name'])
print "Calling {module}:{name}(*{arg}, **{kw})".format(
arg=data['arg'], kw=data['kw'], **data['fn'])
value = fn(*data['arg'], **data['kw'])
 
value = dogpile.cache.api.CachedValue(value, {
"ct": time.time(),
"v": dogpile.cache.region.value_version,
})
 
mc.set(str(data['cache_key']), value)
finally:
# Release the kraken!
mc.delete(str(data['mutex_key']))
 
print "-" * 30
my_module.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from __future__ import print_function
import time
import random
 
 
def my_func():
""" This is my computationally expensive function. """
 
log("Really calling func. sleeping.")
time.sleep(5)
value = random.randint(0, 100)
log("Waking. Cache gets new val", value)
return value
 
def log(*args):
print(*args)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.