Skip to content

Instantly share code, notes, and snippets.

@robinedwards
Created October 16, 2018 11:21
Show Gist options
  • Save robinedwards/a7282544f6ccd6e20e295112e535fb0f to your computer and use it in GitHub Desktop.
Save robinedwards/a7282544f6ccd6e20e295112e535fb0f to your computer and use it in GitHub Desktop.
class ProbeOperator(PythonOperator):
def __init__(self, *args, attempts=None, interval=None, probe_timeout=None, **kwargs):
assert isinstance(attempts, int)
assert isinstance(interval, timedelta)
assert isinstance(probe_timeout, timedelta)
self.probe_timeout = probe_timeout
self.attempts = attempts
kwargs['retries'] = attempts
kwargs['retry_delay'] = interval
kwargs['execution_timeout'] = timedelta(seconds=20)
kwargs['provide_context'] = True
super(ProbeOperator, self).__init__(*args, **kwargs)
def execute_callable(self):
if self.op_kwargs['ti'].execution_date + self.probe_timeout < datetime.utcnow():
raise AirflowException("Probe timed out") # Airflow exception means task doesn't get retried
if not self.probe(*self.op_args, **self.op_kwargs):
raise ProbeConditionNotMet # task will then retry
return True
def probe(self, *args, **kwargs):
raise NotImplemented
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment