Skip to content

Instantly share code, notes, and snippets.

@sjednac
Created August 13, 2018 15:15
Show Gist options
  • Save sjednac/cf2f72590117b37d017fe17c62781df1 to your computer and use it in GitHub Desktop.
Save sjednac/cf2f72590117b37d017fe17c62781df1 to your computer and use it in GitHub Desktop.
Submit a Spark job to an existing Amazon EMR cluster

Submit a Spark job to an existing Amazon EMR cluster

Creates a step in Amazon EMR for a given cluster_id and monitors it's progress using a sensor. A more complex example, that involves cluster creation/termination can be found here.

import airflow
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from datetime import timedelta
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
SPARK_TEST_STEPS = [
{
'Name': 'TestSparkJob1',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--master', 'yarn',
'--driver-memory', '1G',
'--executor-memory','1G',
'--num-executors', '1',
'--class', 'Main',
's3://dev-bucket/test-spark-job-spark-assembly-0.1.0-SNAPSHOT.jar'
]
}
}
]
cluster_id = "j-xxxxxxxxxxxxx"
dag = DAG(
'emr1',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='0 3 * * *'
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=cluster_id,
aws_conn_id='aws_default',
steps=SPARK_TEST_STEPS,
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=cluster_id,
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
step_adder.set_downstream(step_checker)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment