Skip to content

Instantly share code, notes, and snippets.

@r39132
Created September 2, 2016 01:06
Show Gist options
  • Save r39132/ad9601e5d056f655936c0832ca772d6d to your computer and use it in GitHub Desktop.
Save r39132/ad9601e5d056f655936c0832ca772d6d to your computer and use it in GitHub Desktop.
from datetime import datetime
from airflow.models import DAG
from airflow.operators import BashOperator, ShortCircuitOperator
import logging
def skip_to_current_job(ds, **kwargs):
now = datetime.now()
left_window = kwargs['dag'].following_schedule(kwargs['execution_date'])
right_window = kwargs['dag'].following_schedule(left_window)
logging.info(('Left Window {}, Now {}, Right Window {}').format(left_window,now,right_window))
if not now <= right_window:
logging.info('Not latest execution, skipping downstream.')
return False
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment