Created
April 25, 2011 21:02
-
-
Save dialtone/941213 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def funpath(fun): | |
return ".".join([fun.__module__, fun.__name__]) | |
def getm(fun): | |
return namedAny(funpath(fun)) | |
class LockMap(object): | |
""" | |
The LockMap object doesn't really have any active part in | |
the execution of tasks. It is however useful for the user | |
interface in order to understand the status of locks held | |
in memcache. | |
""" | |
def __init__(self): | |
self.lockmap = {} | |
@property | |
def mc(self): | |
return cache.get_memcache_client() | |
def key_for_name(self, task_name): | |
return self.get_locks(task_name)[0] | |
def add_lock(self, task_name, value): | |
self.lockmap[task_name] = value | |
def get_locks(self, task_name=None): | |
if task_name is not None: | |
return self.lockmap.get(task_name) | |
return self.lockmap | |
def is_acquired(self, task_name): | |
key = self.key_for_name(task_name) | |
return self.mc.get(key) is not None | |
def acquire(self, task_name, expires_in=3600): | |
return self.mc.add(self.key_for_name(task_name), time.time()+expires_in, expires_in) | |
def release(self, task_name): | |
return self.mc.delete(self.key_for_name(task_name)) | |
def releases_in(self, task_name): | |
key = self.key_for_name(task_name) | |
old_timestamp = self.mc.get(key) | |
if not old_timestamp: | |
return "-" | |
return int(old_timestamp - time.time()) | |
def refresh_lock(self, task_name, expires_in=None): | |
key, original_expires_in = self.get_locks(task_name) | |
expires_in = expires_in or original_expires_in or 3600 | |
return self.mc.set(key, time.time()+expires_in, expires_in) | |
def get_locks_table(self): | |
l = [] | |
sorted_lock_map = sorted(self.lockmap.items(), key=lambda o: o[1][0]) | |
for fun, (lockname, expires_in) in sorted_lock_map: | |
l.append([".".join(fun.split(".")[-2:]), | |
{"lockname": lockname, | |
"fullname": fun, | |
"availability": "NO" if self.is_acquired(fun) else "OK", | |
"releases_in": self.releases_in(fun), | |
"expires_in": expires_in} | |
]) | |
return l | |
# Decorators | |
lock_map = LockMap() | |
def lock(name=None, expires_in=3600, loglevel=None): | |
def _(fun): | |
lock_map.add_lock(funpath(fun), (name or funpath(fun), expires_in)) | |
def __(*args, **kwargs): | |
task = getm(fun) | |
logger = task.get_logger(loglevel=loglevel) | |
logger.debug("Acquiring lock %s" % (lock_map.key_for_name(task.name),)) | |
if lock_map.acquire(task.name, expires_in): | |
try: | |
logger.debug("Acquired lock %s" % (lock_map.key_for_name(task.name),)) | |
result = fun(*args, **kwargs) | |
logger.debug("Done with %s" % (task.name,)) | |
finally: | |
lock_map.release(task.name) | |
logger.debug("Released lock %s" % (lock_map.key_for_name(task.name),)) | |
return result | |
logger.info("Ignored because of multiple execution") | |
task.update_state(state="IGNORED", meta={"args": args, "kwargs": kwargs}) | |
# Even worse I could set a value on fun and have autoretry check for it before | |
# going in retry mode, if it's not there or it's the value for retry then go on | |
# alternatively one could impose that lock comes before autoretry so that it's | |
# possible to raise from within it without a triggered retry... | |
__.__module__ = fun.__module__ | |
__.__name__ = fun.__name__ | |
return __ | |
return _ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment