Skip to content

Instantly share code, notes, and snippets.

@dialtone
Created April 25, 2011 21:02
Show Gist options
  • Save dialtone/941213 to your computer and use it in GitHub Desktop.
Save dialtone/941213 to your computer and use it in GitHub Desktop.
def funpath(fun):
return ".".join([fun.__module__, fun.__name__])
def getm(fun):
return namedAny(funpath(fun))
class LockMap(object):
"""
The LockMap object doesn't really have any active part in
the execution of tasks. It is however useful for the user
interface in order to understand the status of locks held
in memcache.
"""
def __init__(self):
self.lockmap = {}
@property
def mc(self):
return cache.get_memcache_client()
def key_for_name(self, task_name):
return self.get_locks(task_name)[0]
def add_lock(self, task_name, value):
self.lockmap[task_name] = value
def get_locks(self, task_name=None):
if task_name is not None:
return self.lockmap.get(task_name)
return self.lockmap
def is_acquired(self, task_name):
key = self.key_for_name(task_name)
return self.mc.get(key) is not None
def acquire(self, task_name, expires_in=3600):
return self.mc.add(self.key_for_name(task_name), time.time()+expires_in, expires_in)
def release(self, task_name):
return self.mc.delete(self.key_for_name(task_name))
def releases_in(self, task_name):
key = self.key_for_name(task_name)
old_timestamp = self.mc.get(key)
if not old_timestamp:
return "-"
return int(old_timestamp - time.time())
def refresh_lock(self, task_name, expires_in=None):
key, original_expires_in = self.get_locks(task_name)
expires_in = expires_in or original_expires_in or 3600
return self.mc.set(key, time.time()+expires_in, expires_in)
def get_locks_table(self):
l = []
sorted_lock_map = sorted(self.lockmap.items(), key=lambda o: o[1][0])
for fun, (lockname, expires_in) in sorted_lock_map:
l.append([".".join(fun.split(".")[-2:]),
{"lockname": lockname,
"fullname": fun,
"availability": "NO" if self.is_acquired(fun) else "OK",
"releases_in": self.releases_in(fun),
"expires_in": expires_in}
])
return l
# Decorators
lock_map = LockMap()
def lock(name=None, expires_in=3600, loglevel=None):
def _(fun):
lock_map.add_lock(funpath(fun), (name or funpath(fun), expires_in))
def __(*args, **kwargs):
task = getm(fun)
logger = task.get_logger(loglevel=loglevel)
logger.debug("Acquiring lock %s" % (lock_map.key_for_name(task.name),))
if lock_map.acquire(task.name, expires_in):
try:
logger.debug("Acquired lock %s" % (lock_map.key_for_name(task.name),))
result = fun(*args, **kwargs)
logger.debug("Done with %s" % (task.name,))
finally:
lock_map.release(task.name)
logger.debug("Released lock %s" % (lock_map.key_for_name(task.name),))
return result
logger.info("Ignored because of multiple execution")
task.update_state(state="IGNORED", meta={"args": args, "kwargs": kwargs})
# Even worse I could set a value on fun and have autoretry check for it before
# going in retry mode, if it's not there or it's the value for retry then go on
# alternatively one could impose that lock comes before autoretry so that it's
# possible to raise from within it without a triggered retry...
__.__module__ = fun.__module__
__.__name__ = fun.__name__
return __
return _
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment