Skip to content

Instantly share code, notes, and snippets.

@kinnou02
Created March 26, 2020 13:28
Show Gist options
  • Save kinnou02/5a2b4977fb8eef9ebfd857c22880ba66 to your computer and use it in GitHub Desktop.
Save kinnou02/5a2b4977fb8eef9ebfd857c22880ba66 to your computer and use it in GitHub Desktop.
def lock_release(lock, logger):
token = None
if hasattr(lock, 'local') and hasattr(lock.local, 'token'):
# we store the token of the lock to be able to restore it later in case of error
token = lock.local.token
try:
lock.release()
except:
if token:
# release failed and token has been invalidated, any retry will fail, we restore the token
# so in case of a connection error we will reconnect and release the lock
lock.local.token = token
logger.exception("exception when trying to release lock, will retry")
raise
class Lock(object):
"""
usage:
@celery.task(bind=True)
@Lock(timeout=30 * 60)
def mytask(self):
pass
"""
def __init__(self, timeout):
self.timeout = timeout
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
job_id = get_named_arg('job_id', func, args, kwargs)
logging.debug('args: %s -- kwargs: %s', args, kwargs)
job = models.Job.query.get(job_id)
logger = get_instance_logger(job.instance, task_id=job_id)
task = args[func.func_code.co_varnames.index('self')]
try:
lock = redis.lock('tyr.lock|' + job.instance.name, timeout=self.timeout)
locked = lock.acquire(blocking=False)
except ConnectionError:
logging.exception('Exception with redis while locking. Retrying in 10sec')
task.retry(countdown=10, max_retries=10)
if not locked:
countdown = 300
logger.info('lock on %s retry %s in %s sec', job.instance.name, func.__name__, countdown)
task.retry(countdown=countdown, max_retries=10)
else:
try:
logger.debug('lock acquired on %s for %s', job.instance.name, func.__name__)
return func(*args, **kwargs)
finally:
logger.debug('release lock on %s for %s', job.instance.name, func.__name__)
# sometimes we are disconnected from redis when we want to release the lock,
# so we retry only the release
try:
retrying.Retrying(stop_max_attempt_number=5, wait_fixed=1000).call(
lock_release, lock, logger
)
except ValueError: # LockError(ValueError) since redis 3.0
logger.exception(
"impossible to release lock: continue but following task may be locked :("
)
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment