Skip to content

Instantly share code, notes, and snippets.

@naturalett
Created January 24, 2020 11:08
Show Gist options
  • Save naturalett/8ef8f51cf721ce119d0723a54d4c01f3 to your computer and use it in GitHub Desktop.
Save naturalett/8ef8f51cf721ce119d0723a54d4c01f3 to your computer and use it in GitHub Desktop.
dag template
# !! AUTO-GENERATED !!
# application_id: {{{ ni_application_id }}}
# version: {{{ version }}}
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator \
import EmrAddStepsOperator
from operators.emr_cluster_name_to_id import EmrClusterNameToIdOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import Variable
from itertools import chain
from datetime import datetime, timedelta
import re
import json
import boto3
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(1987, 6, 14),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
ni_application_id = '{{{ ni_application_id }}}'
properties = [{{#properties}}'{{{name}}}', {{/properties}}]
property_keys = list(map(lambda x: '--' + x, properties))
property_values = [{{#properties}}"{% if dag_run.conf['{{{name}}}'] is defined %}{" + "{ dag_run.conf['{{{name}}}'] }" + "}{% else %}" + "{{{value}}}" + "{% endif %}", {{/properties}}]
ni_app_properties = list(chain.from_iterable(zip(property_keys, property_values)))
application_name = '{{{ ni_application_id }}}' + '-' + Variable.get("env", default_var='')
jar_path = "{% if dag_run.conf['jar-path'] is defined %}{" + "{ dag_run.conf['jar-path'] }" + "}{% else %}" + "{{{ jar_location }}}" + "{% endif %}"
additional_config = json.loads(Variable.get('additional-config', default_var='{}'))
additional_config_array = []
if ('configs' in additional_config.keys()):
additional_config_array = additional_config['configs']
# Define configuration for the spark job execution
SPARK_STEPS = [
{
'Name': application_name,
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit', '--deploy-mode', 'cluster',
'--name', application_name,
'--class', 'com.naturalint.data.spark.api.scala.NiSparkAppMain',
jar_path,
'--jar-path', jar_path,
'--ni-main-class', '{{{ main_class }}}',
'--ni-application-id', application_name,
'--env', Variable.get("env", default_var=''),
'--cloudwatch-reporting-enabled', Variable.get('cloudwatch-reporting-enabled', default_var='false'),
'--audit-reporting-enabled', Variable.get('cloudwatch-reporting-enabled', default_var='false'),
'--checkpoint-store', Variable.get('checkpoint-store', default_var='false'),
'--checkpoint-path', Variable.get('checkpoint-path', default_var='false')
] + additional_config_array + ni_app_properties
}
}
]
schedule_interval = '{{{ schedule }}}'
if schedule_interval.lower() == 'none':
schedule_interval = None
# Define the Airflow DAG
dag = DAG(
ni_application_id,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(minutes={{{ timeout_minutes }}}),
catchup=False,
schedule_interval=schedule_interval
)
# Resolve emr cluster id from name
cluster_id_resolver = EmrClusterNameToIdOperator(
task_id='resolve_emr_cluster_id',
cluster_name="{{{ emr_cluster_name }}}",
dag=dag
)
# Add spark step to the EMR cluster with the specified configuration
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{" + "{ task_instance.xcom_pull('resolve_emr_cluster_id', key='return_value') }" + "}",
steps=SPARK_STEPS,
dag=dag
)
# Task sensor to wait until the spark job execution will be ended
step_checker = EmrStepSensor(
task_id='
',
job_flow_id="{" + "{ task_instance.xcom_pull('resolve_emr_cluster_id', key='return_value') }" + "}",
step_id="{" + "{ task_instance.xcom_pull('add_steps', key='return_value')[0] }" + "}",
dag=dag
)
cluster_id_resolver.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment