Skip to content

Instantly share code, notes, and snippets.

@adamantnz
Last active June 18, 2019 20:43
Show Gist options
  • Save adamantnz/36c826e32873649f4941c4cfa43f9762 to your computer and use it in GitHub Desktop.
Save adamantnz/36c826e32873649f4941c4cfa43f9762 to your computer and use it in GitHub Desktop.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from pathlib import Path
import boto3
# dag methods
def on_failure(context):
# specify what should happen when any operator fails
# i.e post to slack, send an email etc.
# pass this method to to your default_args dict so that
# all your workflow steps inherit this behaviour
pass
def xcom_push_example(**kwargs):
task_instance = kwargs["ti"]
# push a key/value pair to xcom using xcom_push()
task_instance.xcom_push(key="mykey", value="myvalue")
def xcom_pull_example(**kwargs):
task_instance = kwargs["ti"]
# retrieve the key/value pair from xcom using xcom_pull()
xcom_value = task_instance.xcom_pull("xcom_push_example", key="mykey")
return f"xcom value: {xcom_value}"
def upload_file_example():
s3 = boto3.resource("s3")
try:
s3.Bucket("bucketname").upload_file(
"/local/file/aliens.jpg",
"/s3path/aliens.jpg"
)
except Exception as err:
raise (f"ERROR: {err}")
# dag configuration
default_args = {
"owner": "AG",
"start_date": datetime(2018, 12, 31),
"concurrency": 1,
"max_active_runs": 1,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": on_failure,
"provide_context": True
}
with DAG(
dag_id=Path(__file__).stem,
default_args=default_args,
description="a super descriptive description",
schedule_interval="00 12 * * *",
catchup=False
) as dag:
get_hostname_operator = BashOperator(
task_id="get_hostname",
bash_command="hostname")
xcom_push_operator = PythonOperator(
task_id="xcom_push_operator",
python_callable=xcom_push_example)
xcom_pull_operator = PythonOperator(
task_id="xcom_pull_operator",
python_callable=xcom_pull_example)
upload_file_operator = PythonOperator(
task_id="upload_file_operator",
python_callable=upload_file_example)
# dag workflow
get_hostname_operator >> \
xcom_push_operator >> \
xcom_pull_operator >> \
upload_file_operator
if __name__ == "__main__":
dag.cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment