Created
May 5, 2016 02:43
-
-
Save r39132/c0d331bc54f113ddf8dbcf279f7d8814 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import datetime | |
from datetime import datetime, timedelta | |
from airflow import DAG | |
from airflow.operators import PythonOperator, BashOperator, ExternalTaskSensor | |
from ep_telemetry_pipeline_utils import * | |
import rt_scorer_asg | |
# constants | |
START = datetime.combine(datetime.today() - timedelta(days=2), datetime.min.time()) + timedelta(hours=10) # 8am GMT | |
DAG_NAME = 'emr_model_building' | |
default_args = { | |
'owner': 'kmandich', | |
'pool': 'ep_emr_model_building', | |
'depends_on_past':False, | |
'start_date': START, | |
'email': [import_ep_pipeline_alert_email_dl], | |
'retries': 0, | |
'retry_delay': timedelta(seconds=120), | |
'email_on_failure': True, | |
'email_on_retry': True | |
} | |
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='0 1 * * *') | |
launch_emr = """ | |
{% if params.import_ep_terminate_emr_cluster == true %} | |
{% if params.ENV == "EP_PROD" %} | |
echo "Launching EMR cluster in Prod Env" | |
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh launch,datascience,ep ep_model_building_prod.conf | |
{% else %} | |
echo "Launching EMR cluster in Stage Env" | |
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh launch,datascience,ep ep_model_building_stage.conf | |
{% endif %} | |
{% endif %} | |
""" | |
run_sm_and_reputation = """ | |
{% if params.ENV == "EP_PROD" %} | |
echo "Building sender models in Prod Env" | |
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh sd ep_model_building_prod.conf | |
{% else %} | |
echo "Building sender models in Stage Env" | |
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh sd ep_model_building_stage.conf | |
{% endif %} | |
""" | |
run_cdd = """ | |
{% if params.ENV == "EP_PROD" %} | |
echo "Building CDD in Prod Env" | |
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh cdd ep_model_building_prod.conf | |
{% else %} | |
echo "Building CDD in Stage Env" | |
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh cdd ep_model_building_stage.conf | |
{% endif %} | |
""" | |
terminate_cluster = """ | |
{% if params.import_ep_terminate_emr_cluster == true %} | |
{% if params.ENV == "EP_PROD" %} | |
echo "Terminating EMR cluster in Prod Env" | |
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh terminate ep_model_building_prod.conf | |
{% else %} | |
echo "Terminating EMR cluster in Stage Env" | |
source ~/.bash_profile; cd /home/deploy/automation/roles/ep-cluster; ./cluster.sh terminate ep_model_building_stage.conf | |
{% endif %} | |
{% else %} | |
echo "NOT terminating EMR cluster" | |
{% endif %} | |
""" | |
t0 = ExternalTaskSensor( | |
task_id='wait_for_previous_run', | |
trigger_rule='one_success', | |
external_dag_id=DAG_NAME, | |
external_task_id='terminate_cluster', | |
allowed_states=['success'], | |
execution_delta=timedelta(days=1), | |
dag=dag) | |
t1 = BashOperator( | |
task_id='launch_emr', | |
bash_command=launch_emr, | |
execution_timeout=timedelta(hours=1), | |
pool='ep_emr_model_building', | |
params={'ENV': ENV, 'import_ep_terminate_emr_cluster':import_ep_terminate_emr_cluster}, | |
dag=dag) | |
t2 = BashOperator( | |
task_id='run_sm_and_reputation', | |
bash_command=run_sm_and_reputation, | |
execution_timeout=timedelta(hours=16), | |
pool='ep_emr_model_building', | |
params={'ENV': ENV}, | |
dag=dag) | |
t3 = BashOperator( | |
task_id='run_cdd', | |
bash_command=run_cdd, | |
execution_timeout=timedelta(hours=6), | |
pool='ep_emr_model_building', | |
params={'ENV': ENV}, | |
dag=dag) | |
t4 = BashOperator( | |
task_id='terminate_cluster', | |
bash_command=terminate_cluster, | |
execution_timeout=timedelta(hours=1), | |
params={'ENV': ENV, 'import_ep_terminate_emr_cluster':import_ep_terminate_emr_cluster}, | |
pool='ep_emr_model_building', | |
dag=dag) | |
t5 = PythonOperator( | |
task_id = 'rt_scorer_update_metadata_bundle', | |
provide_context = True, | |
python_callable = rt_scorer_asg.update_metadata_bundle, | |
dag = dag) | |
t6 = BashOperator( | |
task_id = 'rt_scorer_deploy_asg', | |
provide_context = True, | |
bash_command = rt_scorer_asg.cmd, | |
execution_timeout = timedelta(hours=1), | |
dag = dag) | |
t1.set_upstream(t0) | |
t2.set_upstream(t1) | |
t3.set_upstream(t2) | |
t4.set_upstream(t3) | |
t5.set_upstream(t4) | |
t6.set_upstream(t5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment