Skip to content

Instantly share code, notes, and snippets.

@r39132
Created May 5, 2016 02:43
Show Gist options
  • Save r39132/c0d331bc54f113ddf8dbcf279f7d8814 to your computer and use it in GitHub Desktop.
Save r39132/c0d331bc54f113ddf8dbcf279f7d8814 to your computer and use it in GitHub Desktop.
import logging
import datetime
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators import PythonOperator, BashOperator, ExternalTaskSensor
from ep_telemetry_pipeline_utils import *
import rt_scorer_asg
# constants
START = datetime.combine(datetime.today() - timedelta(days=2), datetime.min.time()) + timedelta(hours=10) # 8am GMT
DAG_NAME = 'emr_model_building'
default_args = {
'owner': 'kmandich',
'pool': 'ep_emr_model_building',
'depends_on_past':False,
'start_date': START,
'email': [import_ep_pipeline_alert_email_dl],
'retries': 0,
'retry_delay': timedelta(seconds=120),
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='0 1 * * *')
launch_emr = """
{% if params.import_ep_terminate_emr_cluster == true %}
{% if params.ENV == "EP_PROD" %}
echo "Launching EMR cluster in Prod Env"
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh launch,datascience,ep ep_model_building_prod.conf
{% else %}
echo "Launching EMR cluster in Stage Env"
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh launch,datascience,ep ep_model_building_stage.conf
{% endif %}
{% endif %}
"""
run_sm_and_reputation = """
{% if params.ENV == "EP_PROD" %}
echo "Building sender models in Prod Env"
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh sd ep_model_building_prod.conf
{% else %}
echo "Building sender models in Stage Env"
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh sd ep_model_building_stage.conf
{% endif %}
"""
run_cdd = """
{% if params.ENV == "EP_PROD" %}
echo "Building CDD in Prod Env"
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh cdd ep_model_building_prod.conf
{% else %}
echo "Building CDD in Stage Env"
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh cdd ep_model_building_stage.conf
{% endif %}
"""
terminate_cluster = """
{% if params.import_ep_terminate_emr_cluster == true %}
{% if params.ENV == "EP_PROD" %}
echo "Terminating EMR cluster in Prod Env"
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh terminate ep_model_building_prod.conf
{% else %}
echo "Terminating EMR cluster in Stage Env"
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh terminate ep_model_building_stage.conf
{% endif %}
{% else %}
echo "NOT terminating EMR cluster"
{% endif %}
"""
t0 = ExternalTaskSensor(
task_id='wait_for_previous_run',
trigger_rule='one_success',
external_dag_id=DAG_NAME,
external_task_id='terminate_cluster',
allowed_states=['success'],
execution_delta=timedelta(days=1),
dag=dag)
t1 = BashOperator(
task_id='launch_emr',
bash_command=launch_emr,
execution_timeout=timedelta(hours=1),
pool='ep_emr_model_building',
params={'ENV': ENV, 'import_ep_terminate_emr_cluster':import_ep_terminate_emr_cluster},
dag=dag)
t2 = BashOperator(
task_id='run_sm_and_reputation',
bash_command=run_sm_and_reputation,
execution_timeout=timedelta(hours=16),
pool='ep_emr_model_building',
params={'ENV': ENV},
dag=dag)
t3 = BashOperator(
task_id='run_cdd',
bash_command=run_cdd,
execution_timeout=timedelta(hours=6),
pool='ep_emr_model_building',
params={'ENV': ENV},
dag=dag)
t4 = BashOperator(
task_id='terminate_cluster',
bash_command=terminate_cluster,
execution_timeout=timedelta(hours=1),
params={'ENV': ENV, 'import_ep_terminate_emr_cluster':import_ep_terminate_emr_cluster},
pool='ep_emr_model_building',
dag=dag)
t5 = PythonOperator(
task_id = 'rt_scorer_update_metadata_bundle',
provide_context = True,
python_callable = rt_scorer_asg.update_metadata_bundle,
dag = dag)
t6 = BashOperator(
task_id = 'rt_scorer_deploy_asg',
provide_context = True,
bash_command = rt_scorer_asg.cmd,
execution_timeout = timedelta(hours=1),
dag = dag)
t1.set_upstream(t0)
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)
t5.set_upstream(t4)
t6.set_upstream(t5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment