Created
January 24, 2020 11:08
-
-
Save naturalett/8ef8f51cf721ce119d0723a54d4c01f3 to your computer and use it in GitHub Desktop.
dag template
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# !! AUTO-GENERATED !! | |
# application_id: {{{ ni_application_id }}} | |
# version: {{{ version }}} | |
import airflow | |
from airflow import DAG | |
from airflow.contrib.operators.emr_add_steps_operator \ | |
import EmrAddStepsOperator | |
from operators.emr_cluster_name_to_id import EmrClusterNameToIdOperator | |
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor | |
from airflow.models import Variable | |
from itertools import chain | |
from datetime import datetime, timedelta | |
import re | |
import json | |
import boto3 | |
DEFAULT_ARGS = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'start_date': datetime(1987, 6, 14), | |
'email': ['airflow@example.com'], | |
'email_on_failure': False, | |
'email_on_retry': False | |
} | |
ni_application_id = '{{{ ni_application_id }}}' | |
properties = [{{#properties}}'{{{name}}}', {{/properties}}] | |
property_keys = list(map(lambda x: '--' + x, properties)) | |
property_values = [{{#properties}}"{% if dag_run.conf['{{{name}}}'] is defined %}{" + "{ dag_run.conf['{{{name}}}'] }" + "}{% else %}" + "{{{value}}}" + "{% endif %}", {{/properties}}] | |
ni_app_properties = list(chain.from_iterable(zip(property_keys, property_values))) | |
application_name = '{{{ ni_application_id }}}' + '-' + Variable.get("env", default_var='') | |
jar_path = "{% if dag_run.conf['jar-path'] is defined %}{" + "{ dag_run.conf['jar-path'] }" + "}{% else %}" + "{{{ jar_location }}}" + "{% endif %}" | |
additional_config = json.loads(Variable.get('additional-config', default_var='{}')) | |
additional_config_array = [] | |
if ('configs' in additional_config.keys()): | |
additional_config_array = additional_config['configs'] | |
# Define configuration for the spark job execution | |
SPARK_STEPS = [ | |
{ | |
'Name': application_name, | |
'ActionOnFailure': 'CONTINUE', | |
'HadoopJarStep': { | |
'Jar': 'command-runner.jar', | |
'Args': [ | |
'spark-submit', '--deploy-mode', 'cluster', | |
'--name', application_name, | |
'--class', 'com.naturalint.data.spark.api.scala.NiSparkAppMain', | |
jar_path, | |
'--jar-path', jar_path, | |
'--ni-main-class', '{{{ main_class }}}', | |
'--ni-application-id', application_name, | |
'--env', Variable.get("env", default_var=''), | |
'--cloudwatch-reporting-enabled', Variable.get('cloudwatch-reporting-enabled', default_var='false'), | |
'--audit-reporting-enabled', Variable.get('cloudwatch-reporting-enabled', default_var='false'), | |
'--checkpoint-store', Variable.get('checkpoint-store', default_var='false'), | |
'--checkpoint-path', Variable.get('checkpoint-path', default_var='false') | |
] + additional_config_array + ni_app_properties | |
} | |
} | |
] | |
schedule_interval = '{{{ schedule }}}' | |
if schedule_interval.lower() == 'none': | |
schedule_interval = None | |
# Define the Airflow DAG | |
dag = DAG( | |
ni_application_id, | |
default_args=DEFAULT_ARGS, | |
dagrun_timeout=timedelta(minutes={{{ timeout_minutes }}}), | |
catchup=False, | |
schedule_interval=schedule_interval | |
) | |
# Resolve emr cluster id from name | |
cluster_id_resolver = EmrClusterNameToIdOperator( | |
task_id='resolve_emr_cluster_id', | |
cluster_name="{{{ emr_cluster_name }}}", | |
dag=dag | |
) | |
# Add spark step to the EMR cluster with the specified configuration | |
step_adder = EmrAddStepsOperator( | |
task_id='add_steps', | |
job_flow_id="{" + "{ task_instance.xcom_pull('resolve_emr_cluster_id', key='return_value') }" + "}", | |
steps=SPARK_STEPS, | |
dag=dag | |
) | |
# Task sensor to wait until the spark job execution will be ended | |
step_checker = EmrStepSensor( | |
task_id=' | |
', | |
job_flow_id="{" + "{ task_instance.xcom_pull('resolve_emr_cluster_id', key='return_value') }" + "}", | |
step_id="{" + "{ task_instance.xcom_pull('add_steps', key='return_value')[0] }" + "}", | |
dag=dag | |
) | |
cluster_id_resolver.set_downstream(step_adder) | |
step_adder.set_downstream(step_checker) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment