Skip to content

Instantly share code, notes, and snippets.

@aviemzur
Created December 27, 2018 11:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aviemzur/b3102f6f384e6f5b61447e9266cd4f5e to your computer and use it in GitHub Desktop.
Save aviemzur/b3102f6f384e6f5b61447e9266cd4f5e to your computer and use it in GitHub Desktop.
emr_add_step_example.py
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator \
import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator \
import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator \
import EmrTerminateJobFlowOperator
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
SPARK_TEST_STEPS = [
{
'Name': 'test step',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit', '--deploy-mode', 'cluster', '--class', 'com.naturalint.data.spark.api.scala.NiSparkAppMain', 's3://ni-data-infra/jars/feeder-factorization-etl-1.0-SNAPSHOT.jar', '--ni-main-class', 'com.naturalint.data.etl.feeder.FactorizationEtl', '--job-id', '133', '--config-file', 's3://galactic-feeder-staging/factorization_input/133.json', '--raw-conversions-table', 'galactic_feeder_staging.conversions_raw', '--previous-runs-table', 'galactic_feeder_staging.factorization_output_partitions', '--parquet-output-location', 's3://galactic-feeder-staging/factorization_output_partitions', '--csv-output-location', 's3://galactic-feeder-staging/output'
]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'PiCalc',
'KeepJobFlowAliveWhenNoSteps': True
}
print('hello')
dag = DAG(
'emr_job_flow_manual_steps_dag',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='0 3 * * *'
)
print('hello2')
#
# cluster_creator = EmrCreateJobFlowOperator(
# task_id='create_job_flow',
# job_flow_overrides=JOB_FLOW_OVERRIDES,
# aws_conn_id='aws_default',
# emr_conn_id='emr_default',
# dag=dag
# )
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="j-3VQJPBNHI0US4",
aws_conn_id='aws_default',
steps=SPARK_TEST_STEPS,
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="j-3VQJPBNHI0US4",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
# cluster_remover = EmrTerminateJobFlowOperator(
# task_id='remove_cluster',
# job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
# aws_conn_id='aws_default',
# dag=dag
# )
#cluster_creator.set_downstream(step_adder)
#step_adder.set_downstream(step_checker)
#step_checker.set_downstream(cluster_remover)
step_adder.set_downstream(step_checker)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment