Skip to content

Instantly share code, notes, and snippets.

@seahrh
Created March 23, 2018 08:39
Show Gist options
  • Save seahrh/17097e0aa940f4166d067e27bd0f6fea to your computer and use it in GitHub Desktop.
Save seahrh/17097e0aa940f4166d067e27bd0f6fea to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
from airflow import DAG
from airflow import utils
from airflow.operators import BashOperator, EmailOperator, DummyOperator
default_args = {
'owner': 'myowner',
'depends_on_past': False,
'start_date': datetime(year=2017, month=10, day=18, hour=0, minute=0),
'email': ['me@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=10),
'execution_timeout': timedelta(hours=2)
}
params = {
'script': './run.sh',
'jar': '/path/to/my.jar',
'class': 'com.example.MyApp',
'memory_overhead': '1024',
'executor_cores': '4',
'num_executors': '4',
'driver_memory': '2g',
'executor_memory': '15g',
'master': 'yarn',
'deploy_mode': 'client',
'queue': 'myqueue',
'log_config_file': '/path/to/log4j.properties',
'spark_home': '/path/to/spark2-client/bin',
'proxy_user': 'myproxyuser',
'spark_port_max_retries': 64
}
ventures = ['']
dag_id = "my_dag_id"
dag = DAG(
dag_id,
default_args=default_args,
schedule_interval='15 * * * * ', #hourly, 15th minute
)
globals()[dag_id] = dag
tasks = []
for v in ventures:
params['venture'] = v
prefix = dag_id + '_' + v
task_id = prefix + '_task'
task = BashOperator(
task_id=task_id,
bash_command=params['script'],
params=params,
dag=dag,
trigger_rule='all_done'
)
email = EmailOperator(
task_id='email_' + v,
dag=dag,
to=email_recipients,
subject="FAIL {{ task_instance_key_str }} {{ execution_date }}",
html_content="Error message<br>{{ task_instance_key_str }}<br>{{ execution_date }}",
trigger_rule='one_failed',
queue='myqueue'
)
task >> email
tasks.append(task)
# Add dummy as last task
# Workaround for https://issues.apache.org/jira/browse/AIRFLOW-1296
dummy = DummyOperator(
task_id='last',
dag=dag
)
tasks.append(dummy)
# Run tasks in order
for i, t in enumerate(tasks):
if (i != 0):
tasks[i].set_upstream(tasks[i - 1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment