Skip to content

Instantly share code, notes, and snippets.

@IceS2
Created March 24, 2016 21:22

Revisions

  1. IceS2 renamed this gist Mar 24, 2016. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. IceS2 created this gist Mar 24, 2016.
    42 changes: 42 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,42 @@
    from airflow import DAG
    from airflow.operators import PythonOperator, TriggerDagRunOperator
    from datetime import datetime, timedelta
    import sys
    sys.path.append('/home/pablo/workspace/scratch/')
    from default_test import default_test

    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now() - timedelta(minutes=15),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    }


    def trigger(context, dag_run_obj):
    if context['params']['condition_param']:
    return dag_run_obj

    dag = DAG('dag_with_trigger', default_args=default_args, schedule_interval=timedelta(minutes=15))

    first = PythonOperator(
    task_id='first',
    python_callable=default_test,
    dag=dag)

    trigger = TriggerDagRunOperator(task_id='trigger',
    trigger_dag_id="target_dag",
    python_callable=trigger,
    params={'condition_param': True,
    'message': 'Running'},
    trigger_rule='all_success',
    dag=dag)

    trigger.set_upstream(first)