Skip to content

Instantly share code, notes, and snippets.

@sangwonl
Created August 2, 2018 13:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sangwonl/6394867ef66c4398120d5c85ad067fa7 to your computer and use it in GitHub Desktop.
Save sangwonl/6394867ef66c4398120d5c85ad067fa7 to your computer and use it in GitHub Desktop.
airflow_python_xcom_sample.py
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import logging
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime.today(),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
# "retries": 0,
# "retry_delay": timedelta(minutes=5),
# 'queue': 'default_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
"filter_even_numbers",
default_args=default_args,
catchup=False,
schedule_interval=timedelta(hours=15)
)
def fetch_input(**kwargs):
return list(range(100)
def filter_even(task_instance, **kwargs):
numbers = task_instance.xcom_pull(task_ids='fetch_input')
logging.debug(f'input numbers: {numbers}')
return list(filter(lambda x: x % 2 == 0, numbers))
t1 = PythonOperator(
task_id='fetch_input',
python_callable=fetch_input,
provide_context=True,
op_kwargs={},
dag=dag
)
t2 = PythonOperator(
task_id='filter_even',
python_callable=filter_even,
provide_context=True,
op_kwargs={},
dag=dag
)
t2.set_upstream(t1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment