Skip to content

Instantly share code, notes, and snippets.

@IceS2
Created March 24, 2016 21:22
from airflow import DAG
from airflow.operators import PythonOperator, TriggerDagRunOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/home/pablo/workspace/scratch/')
from default_test import default_test
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=15),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
def trigger(context, dag_run_obj):
if context['params']['condition_param']:
return dag_run_obj
dag = DAG('dag_with_trigger', default_args=default_args, schedule_interval=timedelta(minutes=15))
first = PythonOperator(
task_id='first',
python_callable=default_test,
dag=dag)
trigger = TriggerDagRunOperator(task_id='trigger',
trigger_dag_id="target_dag",
python_callable=trigger,
params={'condition_param': True,
'message': 'Running'},
trigger_rule='all_success',
dag=dag)
trigger.set_upstream(first)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment