Created
February 12, 2016 02:03
-
-
Save r39132/8a6b0433ec25450b50e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import datetime | |
from datetime import datetime, timedelta | |
from airflow import DAG | |
from airflow.operators import BashOperator, ExternalTaskSensor | |
from telemetry_pipeline_utils import * | |
# constants | |
START = datetime.combine(datetime.today() - timedelta(days=2), datetime.min.time()) + timedelta(hours=10) | |
DAG_NAME = 'emr_model_building' | |
default_args = { | |
'pool': 'emr_model_building', | |
'depends_on_past':False, | |
'start_date': START, | |
'retries': 1, | |
'retry_delay': timedelta(seconds=120), | |
'email_on_failure': True, | |
'email_on_retry': True | |
} | |
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='0 1 * * *') | |
launch_emr = """ | |
{% if params.ENV == "PROD" %} | |
echo "Launching EMR cluster in Prod Env" | |
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh launch,provision,deploy model_building_prod.conf | |
{% else %} | |
echo "Launching EMR cluster in Stage Env" | |
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh launch,provision,deploy model_building_stage.conf | |
{% endif %} | |
""" | |
run_sm_and_reputation = """ | |
{% if params.ENV == "PROD" %} | |
echo "Building sender models in Prod Env" | |
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh sd model_building_prod.conf | |
{% else %} | |
echo "Building sender models in Stage Env" | |
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh sd model_building_stage.conf | |
{% endif %} | |
""" | |
run_cdd = """ | |
{% if params.ENV == "PROD" %} | |
echo "Building CDD in Prod Env" | |
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh cdd model_building_prod.conf | |
{% else %} | |
echo "Building CDD in Stage Env" | |
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh cdd model_building_stage.conf | |
{% endif %} | |
""" | |
terminate_cluster = """ | |
{% if params.import_terminate_emr_cluster == true %} | |
{% if params.ENV == "PROD" %} | |
echo "Terminating EMR cluster in Prod Env" | |
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh terminate model_building_prod.conf | |
{% else %} | |
echo "Terminating EMR cluster in Stage Env" | |
source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh terminate model_building_stage.conf | |
{% endif %} | |
{% else %} | |
echo "NOT terminating EMR cluster" | |
{% endif %} | |
""" | |
t0 = ExternalTaskSensor( | |
task_id='wait_for_previous_run', | |
trigger_rule='one_success', | |
external_dag_id=DAG_NAME, | |
external_task_id='terminate_cluster', | |
allowed_states=['success'], | |
execution_delta=timedelta(days=1), | |
dag=dag) | |
t1 = BashOperator( | |
task_id='launch_emr', | |
bash_command=launch_emr, | |
execution_timeout=timedelta(hours=6), | |
pool='emr_model_building', | |
params={'ENV': ENV, 'import_terminate_emr_cluster':import_terminate_emr_cluster}, | |
dag=dag) | |
t2 = BashOperator( | |
task_id='run_sm_and_reputation', | |
bash_command=run_sm_and_reputation, | |
execution_timeout=timedelta(hours=3), | |
pool='emr_model_building', | |
params={'ENV': ENV}, | |
dag=dag) | |
t3 = BashOperator( | |
task_id='run_cdd', | |
bash_command=run_cdd, | |
execution_timeout=timedelta(hours=3), | |
pool='emr_model_building', | |
params={'ENV': ENV}, | |
dag=dag) | |
t4 = BashOperator( | |
task_id='terminate_cluster', | |
bash_command=terminate_cluster, | |
execution_timeout=timedelta(hours=1), | |
params={'ENV': ENV, 'import_terminate_emr_cluster':import_terminate_emr_cluster}, | |
pool='emr_model_building', | |
dag=dag) | |
t1.set_upstream(t0) | |
t2.set_upstream(t1) | |
t3.set_upstream(t2) | |
t4.set_upstream(t3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment