Skip to content

Instantly share code, notes, and snippets.

@lucasc896
Last active April 28, 2016 15:50
Show Gist options
  • Save lucasc896/61b7602fefd313965c0583ac24974ea8 to your computer and use it in GitHub Desktop.
Save lucasc896/61b7602fefd313965c0583ac24974ea8 to your computer and use it in GitHub Desktop.
Airflow Example DAG
"""
Python test code for airflow investigations
"""
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from modules import print_date, boto_workout, spark_test
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 4, 11, 13, 20, 00),
'email': ['chris.lucas@skimlinks.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
my_dag = DAG(
'python_test_dag', default_args=default_args,
schedule_interval='*/5 * * * *')
task_print_date = PythonOperator(
task_id='print_date',
python_callable=print_date.main,
op_kwargs={'string': 'hello'},
dag=my_dag
)
task_receive_xcom = PythonOperator(
task_id='receive_xcom',
python_callable=print_date.receive_xcom,
provide_context=True,
dag=my_dag
)
task_boto_workout = PythonOperator(
task_id='boto_workout',
python_callable=boto_workout.main,
dag=my_dag
)
task_spark_test = PythonOperator(
task_id='spark_test',
python_callable=spark_test.spark_word_count,
dag=my_dag
)
task_boto_workout.set_upstream(task_print_date)
task_receive_xcom.set_upstream(task_print_date)
task_spark_test.set_upstream(task_receive_xcom)
task_spark_test.set_upstream(task_boto_workout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment