Skip to content

Instantly share code, notes, and snippets.

@adamantnz
Last active June 6, 2019 17:07
Show Gist options
  • Save adamantnz/91123520998485c013187540ab13030a to your computer and use it in GitHub Desktop.
Save adamantnz/91123520998485c013187540ab13030a to your computer and use it in GitHub Desktop.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from pathlib import Path
print(
"""This print is in the global state, so it will print this string
every time the scheduler parses the file. Don't do this!""")
def my_function:
print(
"""This print is contained within a method, so it will only print this string
when the method is invoked. Much better :)""")
def on_failure(context):
# specify what should happen when any operator fails i.e post to slack, send an email etc
pass
def xcom_push_example(**kwargs):
task_instance = kwargs["ti"]
# push a key/value pair to xcom using xcom_push()
task_instance.xcom_push(key="mykey", value="myvalue")
def xcom_pull_example(**kwargs):
task_instance = kwargs["ti"]
# retrieve the key/value pair from xcom using xcom_pull()
xcom_value = task_instance.xcom_pull("xcom_push_example", key="mykey")
return "xcom value: {}".format(xcom_value)
args = {
"owner": "AG",
"start_date": datetime(2018, 12, 31),
"concurrency": 1,
"max_active_runs": 1,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": on_failure,
"provide_context": True
}
dag = DAG (
dag_id=Path(__file__).stem,
default_args=args,
description="a super descriptive description",
# run each day at 12:00 UTC, validate your cron syntax at crontab.guru
schedule_interval="00 12 * * *",
catchup=False
)
get_hostname_operator = BashOperator(
task_id="get_hostname",
bash_command="hostname",
dag=dag)
xcom_push_operator = PythonOperator(
task_id="xcom_push_operator",
python_callable=xcom_push_example,
dag=dag)
xcom_pull_operator = PythonOperator(
task_id="xcom_pull_operator",
python_callable=xcom_pull_example,
dag=dag)
get_hostname_operator >> xcom_push_operator >> xcom_pull_operator
if __name__ == "__main__":
dag.cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment