Skip to content

Instantly share code, notes, and snippets.

@spektom
Created September 26, 2016 19:25
Show Gist options
  • Save spektom/2a7d11c1a7e114f4c7519173646dd5f7 to your computer and use it in GitHub Desktop.
Save spektom/2a7d11c1a7e114f4c7519173646dd5f7 to your computer and use it in GitHub Desktop.
Extending Airflow's Bash operator and making it exclusive using Consul locking mechanism
import consul
import consul_lock
from airflow.operators import BashOperator
class ExclusiveMixin(BashOperator):
"""
Acquires lock on this operator using either task ID or provided lock name,
so no other instance will be eligible to run. This mixin is disabled by default,
in order to enable it pass exclusive=True.
:param lock: Whether to enable this exclusive lock
:type lock: boolean
:param lock_name: Optional lock name to use to enable different operator groups have
different their own locks (defaults to operator's task_id).
:type lock_name: string
"""
def __init__(self, exclusive=False, lock_name=None, *args, **kwargs):
super(ExclusiveMixin, self).__init__(*args, **kwargs)
self.exclusive = exclusive
self.lock_name = lock_name or self.task_id
def execute(self, context):
if self.exclusive:
consul_lock.defaults.consul_client = consul.Consul(host=gethostname())
consul_lock.defaults.lock_key_pattern = "%s"
lock = consul_lock.EphemeralLock(self.lock_name, acquire_timeout_ms=36000000, lock_timeout_seconds=3600)
logging.info("Trying to acquire exclusive lock in Consul: %s" % self.lock_name)
with lock.hold():
logging.info("Lock acquired, proceeding to the command")
super(ExclusiveMixin, self).execute(context)
logging.info("Releasing exclusive lock")
else:
super(ExclusiveMixin, self).execute(context)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment