Created
April 29, 2016 18:09
-
-
Save r39132/8b6715bbe3f59d25474a514eb30bd9e8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG, utils | |
from airflow.operators import * | |
from datetime import date, datetime, time, timedelta | |
from ep_telemetry_pipeline_utils import * | |
now = datetime.now() | |
now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, microsecond=0) | |
START_DATE = now_to_the_hour + timedelta(hours=-3) | |
DAG_NAME = 'ep_telemetry_v2' | |
ORG_IDS = get_active_org_ids_string() | |
default_args = { | |
'owner': 'sanand', | |
'depends_on_past': False, | |
'pool': 'ep_data_pipeline', | |
'start_date': START_DATE, | |
'email': [import_ep_pipeline_alert_email_dl], | |
'email_on_failure': import_airflow_enable_notifications, | |
'email_on_retry': import_airflow_enable_notifications, | |
'retries': 1, | |
'retry_delay': timedelta(seconds=120), | |
'priority_weight': import_airflow_priority_weight | |
} | |
dag = DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args, sla_miss_callback=sla_alert_func) | |
# Operator DAG Design | |
# 0. Purge DLQ | |
# 1.a Get Collector counts for validation | |
# 1.b Delete db data | |
# 2. Generate Parquet files -- this also consumes new upload files and moves them into archived | |
# 2.a. Get counts of messages by org | |
# 3. Run Model Building | |
# 4. Run Aggregate Spark Job -- this generates aggregate data from Parquet files | |
# 5. Check SQS for new messages -- if no messages, branch --> send email | |
# 6. Check the DB for new rows within exec_date - 1d <--> exec_date | |
# 7. Wait for SQS to drain | |
# 8. Send an email that the flow completed successfully | |
# Operator : Wait for S3 delays related to collector_ingest copies | |
wait_for_collector_ingest = PythonOperator( | |
task_id='wait_for_collector_ingest', | |
provide_context=True, | |
python_callable=wait_for_collector_ingest, | |
dag=dag) | |
job_started_ping = PythonOperator( | |
task_id='job_started_ping', | |
provide_context=True, | |
python_callable=ping_healthchecks_io, | |
dag=dag) | |
job_started_ping.set_upstream(wait_for_collector_ingest) | |
# Operator : Build models only if the 0th hour! | |
check_for_time_to_build_model_branch_condition = BranchPythonOperator( | |
task_id='check_for_time_to_build_model_branch_condition', | |
provide_context=True, | |
python_callable=check_for_time_to_build_model_branch_condition, | |
dag=dag) | |
check_for_time_to_build_model_branch_condition.set_upstream(wait_for_collector_ingest) | |
prejoin_preagg_dummy_job = DummyOperator( | |
task_id='prejoin_preagg_dummy_job', | |
dag=dag) | |
prejoin_preagg_dummy_job.set_upstream(check_for_time_to_build_model_branch_condition) | |
# Model building : 3 steps: sender models, cdd, domain reputation generation. | |
# Operator : Build Sender model | |
sender_model_building_command = """ | |
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_model_building.sh sm' | |
""" | |
build_sender_models_spark_job = BashOperator( | |
task_id='build_sender_models_spark_job', | |
execution_timeout=timedelta(hours=3), | |
pool='ep_data_pipeline_spark_tasks_only', | |
bash_command=sender_model_building_command, | |
params={'CLUSTER_IP':PLATFORM_VARS['ip'], 'USER':PLATFORM_VARS['ssh_user'], 'HOME_DIR':PLATFORM_VARS['home_dir'], 'SSH_KEY':SSH_KEY}, | |
dag=dag) | |
build_sender_models_spark_job.set_upstream(check_for_time_to_build_model_branch_condition) | |
# Operator : Build CDD model | |
cdd_model_building_command = """ | |
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_model_building.sh cdd' | |
""" | |
build_cdd_models_spark_job = BashOperator( | |
task_id='build_cdd_models_spark_job', | |
execution_timeout=timedelta(hours=3), | |
pool='ep_data_pipeline_spark_tasks_only', | |
bash_command=cdd_model_building_command, | |
params={'CLUSTER_IP':PLATFORM_VARS['ip'], 'USER':PLATFORM_VARS['ssh_user'], 'HOME_DIR':PLATFORM_VARS['home_dir'], 'SSH_KEY':SSH_KEY}, | |
dag=dag) | |
build_cdd_models_spark_job.set_upstream(build_sender_models_spark_job) | |
# Operator : Build Domain Rep model | |
dom_rep_model_building_command = """ | |
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_model_building.sh dom_rep' | |
""" | |
build_dom_rep_models_spark_job = BashOperator( | |
task_id='build_dom_rep_models_spark_job', | |
execution_timeout=timedelta(hours=3), | |
pool='ep_data_pipeline_spark_tasks_only', | |
bash_command=dom_rep_model_building_command, | |
params={'CLUSTER_IP':PLATFORM_VARS['ip'], 'USER':PLATFORM_VARS['ssh_user'], 'HOME_DIR':PLATFORM_VARS['home_dir'], 'SSH_KEY':SSH_KEY}, | |
dag=dag) | |
build_dom_rep_models_spark_job.set_upstream(build_cdd_models_spark_job) | |
# Operator : Check for Model Building Success | |
check_for_successful_model_building_branch_condition = BranchPythonOperator( | |
task_id='check_for_successful_model_building_branch_condition', | |
provide_context=True, | |
python_callable=check_for_successful_model_building_branch_condition, | |
dag=dag) | |
check_for_successful_model_building_branch_condition.set_upstream(build_dom_rep_models_spark_job) | |
# Operator : Send SNS that there is model building fails | |
send_sns_notification_model_building_failed = PythonOperator( | |
task_id='send_sns_notification_model_building_failed', | |
provide_context=True, | |
python_callable=send_sns_notification_model_building_failed, | |
dag=dag) | |
send_sns_notification_model_building_failed.set_upstream(check_for_successful_model_building_branch_condition) | |
c_params = { | |
"compute_end_dt": compute_end_dt | |
} | |
wait_for_previous_hour = ExternalTaskSensor( | |
task_id='wait_for_previous_hour', | |
trigger_rule='one_success', | |
external_dag_id=DAG_NAME, | |
external_task_id='send_email_notification_flow_successful', | |
allowed_states=['success'], | |
execution_delta=timedelta(hours=1), | |
dag=dag) | |
wait_for_previous_hour.set_upstream(check_for_successful_model_building_branch_condition) | |
wait_for_previous_hour.set_upstream(prejoin_preagg_dummy_job) | |
# Operator : Purge DLQ | |
purge_DLQ = PythonOperator( | |
task_id='purge_DLQ', | |
provide_context=True, | |
python_callable=purge_DLQ, | |
dag=dag) | |
purge_DLQ.set_upstream(wait_for_previous_hour) | |
# Operator : Clean up the disk to ensure run can proceed! | |
clear_spark_logs_cmd = ''' | |
{% if params.CLEAR_LOGS %} | |
{% for slave in params.SLAVES %} | |
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ slave }} 'rm -rf /mnt/var/log/tmp/var/log/hadoop-yarn/containers/application_*; sudo rm -rf /mnt/var/log/hadoop-yarn/containers/application_*' | |
{% endfor %} | |
{% endif %} | |
''' | |
clear_spark_logs = BashOperator( | |
task_id='clear_spark_logs', | |
execution_timeout=timedelta(minutes=10), | |
pool='ep_data_pipeline', | |
bash_command=clear_spark_logs_cmd, | |
params={'SLAVES':SLAVES, 'CLEAR_LOGS':CLEAR_LOGS, 'USER':PLATFORM_VARS['ssh_user'], 'HOME_DIR':PLATFORM_VARS['home_dir'], 'SSH_KEY':SSH_KEY}, | |
dag=dag) | |
clear_spark_logs.set_upstream(wait_for_previous_hour) | |
# Operator : Delete data from target tables | |
delete_db_data = PythonOperator( | |
task_id='delete_db_data', | |
provide_context=True, | |
python_callable=delete_db_data, | |
dag=dag) | |
delete_db_data.set_upstream(wait_for_previous_hour) | |
# Operator : Discover Ingest-enabled Orgs | |
discover_ingest_enabled_orgs = PythonOperator( | |
task_id='discover_ingest_enabled_orgs', | |
provide_context=False, | |
python_callable=get_org_ingest_dict, | |
dag=dag) | |
discover_ingest_enabled_orgs.set_upstream(wait_for_previous_hour) | |
# Operator : Read the parquet data and generate aggregate data | |
# 5 args: -s -e -f -o -u | |
optional_aggregation_templated_command = """ | |
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_aggregation.sh {{ execution_date.strftime(params.spark_date_format) }} {{ params.compute_end_dt(execution_date, task_instance).strftime(params.spark_date_format) }} {{ params.granularity_secs }} {{ params.org_ids }}' | |
""" | |
aggregate_data_spark_job = BashOperator( | |
task_id='aggregate_data_spark_job', | |
execution_timeout=timedelta(hours=import_ep_pipeline_aggregation_timeout), | |
priority_weight=5, | |
pool='ep_data_pipeline_spark_tasks_only', | |
bash_command=optional_aggregation_templated_command, | |
on_retry_callback=spark_agg_retry, | |
params={'CLUSTER_IP':PLATFORM_VARS['ip'], | |
'USER':PLATFORM_VARS['ssh_user'], | |
'HOME_DIR':PLATFORM_VARS['home_dir'], | |
'SSH_KEY':SSH_KEY, | |
'granularity_secs': import_airflow_granularity_secs, | |
'compute_end_dt' : c_params["compute_end_dt"], | |
'spark_date_format' : SPARK_DATE_FORMAT, | |
'org_ids':ORG_IDS}, | |
dag=dag) | |
aggregate_data_spark_job.set_upstream(purge_DLQ) | |
aggregate_data_spark_job.set_upstream(clear_spark_logs) | |
aggregate_data_spark_job.set_upstream(delete_db_data) | |
aggregate_data_spark_job.set_upstream(discover_ingest_enabled_orgs) | |
# Operator : Check for new data in DB | |
wait_for_new_data_in_db = PythonOperator( | |
task_id='wait_for_new_data_in_db', | |
provide_context=True, | |
python_callable=wait_for_new_data_in_db_simple, | |
dag=dag) | |
wait_for_new_data_in_db.set_upstream(aggregate_data_spark_job) | |
# Operator : Check for SQS Queue to be drained | |
wait_for_empty_queue = PythonOperator( | |
task_id='wait_for_empty_queue', | |
provide_context=True, | |
python_callable=wait_for_empty_queue, | |
dag=dag) | |
wait_for_empty_queue.set_upstream(wait_for_new_data_in_db) | |
# Operator : Kick off message aggregation | |
aggregate_db_message_job_cmd = ''' | |
(cd /usr/local/agari/cousteau/production/current/; RAILS_ENV=production bundle exec bin/generate-message-aggregates --start-date {{execution_date.strftime('%Y-%m-%d')}}) | |
''' | |
aggregate_db_message_job = BashOperator( | |
task_id='aggregate_db_message_job', | |
execution_timeout=timedelta(hours=3), | |
pool='ep_data_pipeline_db_msg_agg', | |
bash_command=aggregate_db_message_job_cmd, | |
dag=dag) | |
aggregate_db_message_job.set_upstream(wait_for_empty_queue) | |
# Operator : Get collector message counts (used to be local) | |
get_collector_msg_counts_cmd = ''' | |
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_msg_counts.sh {{ execution_date.strftime(params.spark_date_format) }} {{ params.compute_end_dt(execution_date, task_instance).strftime(params.spark_date_format) }} {{ params.granularity_secs }} {{ params.org_ids }}' | |
''' | |
get_collector_msg_counts = BashOperator( | |
task_id='get_collector_msg_counts', | |
execution_timeout=timedelta(hours=1), | |
pool='ep_data_pipeline_spark_tasks_only', | |
bash_command=get_collector_msg_counts_cmd, | |
params={'CLUSTER_IP':PLATFORM_VARS['ip'], | |
'USER':PLATFORM_VARS['ssh_user'], | |
'HOME_DIR':PLATFORM_VARS['home_dir'], | |
'SSH_KEY':SSH_KEY, | |
'granularity_secs': import_airflow_granularity_secs, | |
'compute_end_dt' : c_params["compute_end_dt"], | |
'spark_date_format' : SPARK_DATE_FORMAT, | |
'org_ids':ORG_IDS}, | |
dag=dag) | |
get_collector_msg_counts.set_upstream(aggregate_db_message_job) | |
# Operator : Send Email when flow completes successfully | |
send_email_notification_flow_successful = PythonOperator( | |
task_id='send_email_notification_flow_successful', | |
execution_timeout=timedelta(minutes=15), | |
pool='ep_data_pipeline_metrics_gathering', | |
provide_context=True, | |
sla=timedelta(hours=2), | |
python_callable=send_email_notification_flow_successful, | |
dag=dag) | |
send_email_notification_flow_successful.set_upstream(get_collector_msg_counts) | |
# Operator : Enqueue a job for each organization via an SQS Queue | |
enqueue_alerting_jobs = PythonOperator( | |
task_id='enqueue_alerting_jobs', | |
provide_context=True, | |
depends_on_past=True, | |
python_callable=enqueue_alerting_jobs, | |
dag=dag) | |
enqueue_alerting_jobs.set_upstream(wait_for_empty_queue) | |
# Operator : optionally_run_vacuum_analyze | |
check_for_time_to_run_vacuum_analyze_condition = ShortCircuitOperator( | |
task_id='check_for_time_to_run_vacuum_analyze_condition', | |
pool='ep_pipeline_vacuum_analyze', | |
provide_context=True, | |
python_callable=check_for_time_to_run_vacuum_analyze_condition, | |
dag=dag) | |
check_for_time_to_run_vacuum_analyze_condition.set_upstream(get_collector_msg_counts) | |
# Operator : run vacuum analyze | |
run_vacuum_analyze = PythonOperator( | |
task_id='run_vacuum_analyze', | |
pool='ep_pipeline_vacuum_analyze', | |
provide_context=True, | |
python_callable=run_vacuum_analyze, | |
dag=dag) | |
run_vacuum_analyze.set_upstream(check_for_time_to_run_vacuum_analyze_condition) | |
# Operator : Send Slack when flow completes successfully | |
#post_completion_to_slack_job = SlackAPIPostOperator( | |
# task_id='post_completion_to_slack_job', | |
# token=slack_api_token, | |
# channel=ep_ops_slack_channel_name, | |
# text='{{dag.dag_id}} on {{ ti.hostname }} completed successfully for `{{execution_date}}`', | |
# icon_url='http://airbnb.io/img/projects/airflow3.png', | |
# dag=dag) | |
#post_completion_to_slack_job.set_upstream(send_email_notification_flow_successful) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment