Created
September 26, 2016 19:25
-
-
Save spektom/2a7d11c1a7e114f4c7519173646dd5f7 to your computer and use it in GitHub Desktop.
Extending Airflow's Bash operator and making it exclusive using Consul locking mechanism
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import consul | |
import consul_lock | |
from airflow.operators import BashOperator | |
class ExclusiveMixin(BashOperator): | |
""" | |
Acquires lock on this operator using either task ID or provided lock name, | |
so no other instance will be eligible to run. This mixin is disabled by default, | |
in order to enable it pass exclusive=True. | |
:param lock: Whether to enable this exclusive lock | |
:type lock: boolean | |
:param lock_name: Optional lock name to use to enable different operator groups have | |
different their own locks (defaults to operator's task_id). | |
:type lock_name: string | |
""" | |
def __init__(self, exclusive=False, lock_name=None, *args, **kwargs): | |
super(ExclusiveMixin, self).__init__(*args, **kwargs) | |
self.exclusive = exclusive | |
self.lock_name = lock_name or self.task_id | |
def execute(self, context): | |
if self.exclusive: | |
consul_lock.defaults.consul_client = consul.Consul(host=gethostname()) | |
consul_lock.defaults.lock_key_pattern = "%s" | |
lock = consul_lock.EphemeralLock(self.lock_name, acquire_timeout_ms=36000000, lock_timeout_seconds=3600) | |
logging.info("Trying to acquire exclusive lock in Consul: %s" % self.lock_name) | |
with lock.hold(): | |
logging.info("Lock acquired, proceeding to the command") | |
super(ExclusiveMixin, self).execute(context) | |
logging.info("Releasing exclusive lock") | |
else: | |
super(ExclusiveMixin, self).execute(context) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment