Skip to content

Instantly share code, notes, and snippets.

@r39132
Created February 12, 2016 02:03
Show Gist options
  • Save r39132/8a6b0433ec25450b50e9 to your computer and use it in GitHub Desktop.
Save r39132/8a6b0433ec25450b50e9 to your computer and use it in GitHub Desktop.
import logging
import datetime
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators import BashOperator, ExternalTaskSensor
from telemetry_pipeline_utils import *
# constants
START = datetime.combine(datetime.today() - timedelta(days=2), datetime.min.time()) + timedelta(hours=10)
DAG_NAME = 'emr_model_building'
default_args = {
'pool': 'emr_model_building',
'depends_on_past':False,
'start_date': START,
'retries': 1,
'retry_delay': timedelta(seconds=120),
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='0 1 * * *')
launch_emr = """
{% if params.ENV == "PROD" %}
echo "Launching EMR cluster in Prod Env"
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh launch,provision,deploy model_building_prod.conf
{% else %}
echo "Launching EMR cluster in Stage Env"
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh launch,provision,deploy model_building_stage.conf
{% endif %}
"""
run_sm_and_reputation = """
{% if params.ENV == "PROD" %}
echo "Building sender models in Prod Env"
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh sd model_building_prod.conf
{% else %}
echo "Building sender models in Stage Env"
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh sd model_building_stage.conf
{% endif %}
"""
run_cdd = """
{% if params.ENV == "PROD" %}
echo "Building CDD in Prod Env"
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh cdd model_building_prod.conf
{% else %}
echo "Building CDD in Stage Env"
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh cdd model_building_stage.conf
{% endif %}
"""
terminate_cluster = """
{% if params.import_terminate_emr_cluster == true %}
{% if params.ENV == "PROD" %}
echo "Terminating EMR cluster in Prod Env"
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh terminate model_building_prod.conf
{% else %}
echo "Terminating EMR cluster in Stage Env"
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh terminate model_building_stage.conf
{% endif %}
{% else %}
echo "NOT terminating EMR cluster"
{% endif %}
"""
t0 = ExternalTaskSensor(
task_id='wait_for_previous_run',
trigger_rule='one_success',
external_dag_id=DAG_NAME,
external_task_id='terminate_cluster',
allowed_states=['success'],
execution_delta=timedelta(days=1),
dag=dag)
t1 = BashOperator(
task_id='launch_emr',
bash_command=launch_emr,
execution_timeout=timedelta(hours=6),
pool='emr_model_building',
params={'ENV': ENV, 'import_terminate_emr_cluster':import_terminate_emr_cluster},
dag=dag)
t2 = BashOperator(
task_id='run_sm_and_reputation',
bash_command=run_sm_and_reputation,
execution_timeout=timedelta(hours=3),
pool='emr_model_building',
params={'ENV': ENV},
dag=dag)
t3 = BashOperator(
task_id='run_cdd',
bash_command=run_cdd,
execution_timeout=timedelta(hours=3),
pool='emr_model_building',
params={'ENV': ENV},
dag=dag)
t4 = BashOperator(
task_id='terminate_cluster',
bash_command=terminate_cluster,
execution_timeout=timedelta(hours=1),
params={'ENV': ENV, 'import_terminate_emr_cluster':import_terminate_emr_cluster},
pool='emr_model_building',
dag=dag)
t1.set_upstream(t0)
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment