Skip to content

Instantly share code, notes, and snippets.

@r39132
Created April 29, 2016 18:09
Show Gist options
  • Save r39132/8b6715bbe3f59d25474a514eb30bd9e8 to your computer and use it in GitHub Desktop.
Save r39132/8b6715bbe3f59d25474a514eb30bd9e8 to your computer and use it in GitHub Desktop.
from airflow import DAG, utils
from airflow.operators import *
from datetime import date, datetime, time, timedelta
from ep_telemetry_pipeline_utils import *
now = datetime.now()
now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, microsecond=0)
START_DATE = now_to_the_hour + timedelta(hours=-3)
DAG_NAME = 'ep_telemetry_v2'
ORG_IDS = get_active_org_ids_string()
default_args = {
'owner': 'sanand',
'depends_on_past': False,
'pool': 'ep_data_pipeline',
'start_date': START_DATE,
'email': [import_ep_pipeline_alert_email_dl],
'email_on_failure': import_airflow_enable_notifications,
'email_on_retry': import_airflow_enable_notifications,
'retries': 1,
'retry_delay': timedelta(seconds=120),
'priority_weight': import_airflow_priority_weight
}
dag = DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args, sla_miss_callback=sla_alert_func)
# Operator DAG Design
# 0. Purge DLQ
# 1.a Get Collector counts for validation
# 1.b Delete db data
# 2. Generate Parquet files -- this also consumes new upload files and moves them into archived
# 2.a. Get counts of messages by org
# 3. Run Model Building
# 4. Run Aggregate Spark Job -- this generates aggregate data from Parquet files
# 5. Check SQS for new messages -- if no messages, branch --> send email
# 6. Check the DB for new rows within exec_date - 1d <--> exec_date
# 7. Wait for SQS to drain
# 8. Send an email that the flow completed successfully
# Operator : Wait for S3 delays related to collector_ingest copies
wait_for_collector_ingest = PythonOperator(
task_id='wait_for_collector_ingest',
provide_context=True,
python_callable=wait_for_collector_ingest,
dag=dag)
job_started_ping = PythonOperator(
task_id='job_started_ping',
provide_context=True,
python_callable=ping_healthchecks_io,
dag=dag)
job_started_ping.set_upstream(wait_for_collector_ingest)
# Operator : Build models only if the 0th hour!
check_for_time_to_build_model_branch_condition = BranchPythonOperator(
task_id='check_for_time_to_build_model_branch_condition',
provide_context=True,
python_callable=check_for_time_to_build_model_branch_condition,
dag=dag)
check_for_time_to_build_model_branch_condition.set_upstream(wait_for_collector_ingest)
prejoin_preagg_dummy_job = DummyOperator(
task_id='prejoin_preagg_dummy_job',
dag=dag)
prejoin_preagg_dummy_job.set_upstream(check_for_time_to_build_model_branch_condition)
# Model building : 3 steps: sender models, cdd, domain reputation generation.
# Operator : Build Sender model
sender_model_building_command = """
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_model_building.sh sm'
"""
build_sender_models_spark_job = BashOperator(
task_id='build_sender_models_spark_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_spark_tasks_only',
bash_command=sender_model_building_command,
params={'CLUSTER_IP':PLATFORM_VARS['ip'], 'USER':PLATFORM_VARS['ssh_user'], 'HOME_DIR':PLATFORM_VARS['home_dir'], 'SSH_KEY':SSH_KEY},
dag=dag)
build_sender_models_spark_job.set_upstream(check_for_time_to_build_model_branch_condition)
# Operator : Build CDD model
cdd_model_building_command = """
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_model_building.sh cdd'
"""
build_cdd_models_spark_job = BashOperator(
task_id='build_cdd_models_spark_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_spark_tasks_only',
bash_command=cdd_model_building_command,
params={'CLUSTER_IP':PLATFORM_VARS['ip'], 'USER':PLATFORM_VARS['ssh_user'], 'HOME_DIR':PLATFORM_VARS['home_dir'], 'SSH_KEY':SSH_KEY},
dag=dag)
build_cdd_models_spark_job.set_upstream(build_sender_models_spark_job)
# Operator : Build Domain Rep model
dom_rep_model_building_command = """
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_model_building.sh dom_rep'
"""
build_dom_rep_models_spark_job = BashOperator(
task_id='build_dom_rep_models_spark_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_spark_tasks_only',
bash_command=dom_rep_model_building_command,
params={'CLUSTER_IP':PLATFORM_VARS['ip'], 'USER':PLATFORM_VARS['ssh_user'], 'HOME_DIR':PLATFORM_VARS['home_dir'], 'SSH_KEY':SSH_KEY},
dag=dag)
build_dom_rep_models_spark_job.set_upstream(build_cdd_models_spark_job)
# Operator : Check for Model Building Success
check_for_successful_model_building_branch_condition = BranchPythonOperator(
task_id='check_for_successful_model_building_branch_condition',
provide_context=True,
python_callable=check_for_successful_model_building_branch_condition,
dag=dag)
check_for_successful_model_building_branch_condition.set_upstream(build_dom_rep_models_spark_job)
# Operator : Send SNS that there is model building fails
send_sns_notification_model_building_failed = PythonOperator(
task_id='send_sns_notification_model_building_failed',
provide_context=True,
python_callable=send_sns_notification_model_building_failed,
dag=dag)
send_sns_notification_model_building_failed.set_upstream(check_for_successful_model_building_branch_condition)
c_params = {
"compute_end_dt": compute_end_dt
}
wait_for_previous_hour = ExternalTaskSensor(
task_id='wait_for_previous_hour',
trigger_rule='one_success',
external_dag_id=DAG_NAME,
external_task_id='send_email_notification_flow_successful',
allowed_states=['success'],
execution_delta=timedelta(hours=1),
dag=dag)
wait_for_previous_hour.set_upstream(check_for_successful_model_building_branch_condition)
wait_for_previous_hour.set_upstream(prejoin_preagg_dummy_job)
# Operator : Purge DLQ
purge_DLQ = PythonOperator(
task_id='purge_DLQ',
provide_context=True,
python_callable=purge_DLQ,
dag=dag)
purge_DLQ.set_upstream(wait_for_previous_hour)
# Operator : Clean up the disk to ensure run can proceed!
clear_spark_logs_cmd = '''
{% if params.CLEAR_LOGS %}
{% for slave in params.SLAVES %}
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ slave }} 'rm -rf /mnt/var/log/tmp/var/log/hadoop-yarn/containers/application_*; sudo rm -rf /mnt/var/log/hadoop-yarn/containers/application_*'
{% endfor %}
{% endif %}
'''
clear_spark_logs = BashOperator(
task_id='clear_spark_logs',
execution_timeout=timedelta(minutes=10),
pool='ep_data_pipeline',
bash_command=clear_spark_logs_cmd,
params={'SLAVES':SLAVES, 'CLEAR_LOGS':CLEAR_LOGS, 'USER':PLATFORM_VARS['ssh_user'], 'HOME_DIR':PLATFORM_VARS['home_dir'], 'SSH_KEY':SSH_KEY},
dag=dag)
clear_spark_logs.set_upstream(wait_for_previous_hour)
# Operator : Delete data from target tables
delete_db_data = PythonOperator(
task_id='delete_db_data',
provide_context=True,
python_callable=delete_db_data,
dag=dag)
delete_db_data.set_upstream(wait_for_previous_hour)
# Operator : Discover Ingest-enabled Orgs
discover_ingest_enabled_orgs = PythonOperator(
task_id='discover_ingest_enabled_orgs',
provide_context=False,
python_callable=get_org_ingest_dict,
dag=dag)
discover_ingest_enabled_orgs.set_upstream(wait_for_previous_hour)
# Operator : Read the parquet data and generate aggregate data
# 5 args: -s -e -f -o -u
optional_aggregation_templated_command = """
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_aggregation.sh {{ execution_date.strftime(params.spark_date_format) }} {{ params.compute_end_dt(execution_date, task_instance).strftime(params.spark_date_format) }} {{ params.granularity_secs }} {{ params.org_ids }}'
"""
aggregate_data_spark_job = BashOperator(
task_id='aggregate_data_spark_job',
execution_timeout=timedelta(hours=import_ep_pipeline_aggregation_timeout),
priority_weight=5,
pool='ep_data_pipeline_spark_tasks_only',
bash_command=optional_aggregation_templated_command,
on_retry_callback=spark_agg_retry,
params={'CLUSTER_IP':PLATFORM_VARS['ip'],
'USER':PLATFORM_VARS['ssh_user'],
'HOME_DIR':PLATFORM_VARS['home_dir'],
'SSH_KEY':SSH_KEY,
'granularity_secs': import_airflow_granularity_secs,
'compute_end_dt' : c_params["compute_end_dt"],
'spark_date_format' : SPARK_DATE_FORMAT,
'org_ids':ORG_IDS},
dag=dag)
aggregate_data_spark_job.set_upstream(purge_DLQ)
aggregate_data_spark_job.set_upstream(clear_spark_logs)
aggregate_data_spark_job.set_upstream(delete_db_data)
aggregate_data_spark_job.set_upstream(discover_ingest_enabled_orgs)
# Operator : Check for new data in DB
wait_for_new_data_in_db = PythonOperator(
task_id='wait_for_new_data_in_db',
provide_context=True,
python_callable=wait_for_new_data_in_db_simple,
dag=dag)
wait_for_new_data_in_db.set_upstream(aggregate_data_spark_job)
# Operator : Check for SQS Queue to be drained
wait_for_empty_queue = PythonOperator(
task_id='wait_for_empty_queue',
provide_context=True,
python_callable=wait_for_empty_queue,
dag=dag)
wait_for_empty_queue.set_upstream(wait_for_new_data_in_db)
# Operator : Kick off message aggregation
aggregate_db_message_job_cmd = '''
(cd /usr/local/agari/cousteau/production/current/; RAILS_ENV=production bundle exec bin/generate-message-aggregates --start-date {{execution_date.strftime('%Y-%m-%d')}})
'''
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
# Operator : Get collector message counts (used to be local)
get_collector_msg_counts_cmd = '''
ssh -o 'StrictHostKeyChecking no' -i ~/.ssh/{{ params.SSH_KEY }} {{ params.USER }}@{{ params.CLUSTER_IP }} 'cd {{ params.HOME_DIR }}/spark/bin; . {{ params.HOME_DIR }}/.bash_profile; {{ params.HOME_DIR }}/run_msg_counts.sh {{ execution_date.strftime(params.spark_date_format) }} {{ params.compute_end_dt(execution_date, task_instance).strftime(params.spark_date_format) }} {{ params.granularity_secs }} {{ params.org_ids }}'
'''
get_collector_msg_counts = BashOperator(
task_id='get_collector_msg_counts',
execution_timeout=timedelta(hours=1),
pool='ep_data_pipeline_spark_tasks_only',
bash_command=get_collector_msg_counts_cmd,
params={'CLUSTER_IP':PLATFORM_VARS['ip'],
'USER':PLATFORM_VARS['ssh_user'],
'HOME_DIR':PLATFORM_VARS['home_dir'],
'SSH_KEY':SSH_KEY,
'granularity_secs': import_airflow_granularity_secs,
'compute_end_dt' : c_params["compute_end_dt"],
'spark_date_format' : SPARK_DATE_FORMAT,
'org_ids':ORG_IDS},
dag=dag)
get_collector_msg_counts.set_upstream(aggregate_db_message_job)
# Operator : Send Email when flow completes successfully
send_email_notification_flow_successful = PythonOperator(
task_id='send_email_notification_flow_successful',
execution_timeout=timedelta(minutes=15),
pool='ep_data_pipeline_metrics_gathering',
provide_context=True,
sla=timedelta(hours=2),
python_callable=send_email_notification_flow_successful,
dag=dag)
send_email_notification_flow_successful.set_upstream(get_collector_msg_counts)
# Operator : Enqueue a job for each organization via an SQS Queue
enqueue_alerting_jobs = PythonOperator(
task_id='enqueue_alerting_jobs',
provide_context=True,
depends_on_past=True,
python_callable=enqueue_alerting_jobs,
dag=dag)
enqueue_alerting_jobs.set_upstream(wait_for_empty_queue)
# Operator : optionally_run_vacuum_analyze
check_for_time_to_run_vacuum_analyze_condition = ShortCircuitOperator(
task_id='check_for_time_to_run_vacuum_analyze_condition',
pool='ep_pipeline_vacuum_analyze',
provide_context=True,
python_callable=check_for_time_to_run_vacuum_analyze_condition,
dag=dag)
check_for_time_to_run_vacuum_analyze_condition.set_upstream(get_collector_msg_counts)
# Operator : run vacuum analyze
run_vacuum_analyze = PythonOperator(
task_id='run_vacuum_analyze',
pool='ep_pipeline_vacuum_analyze',
provide_context=True,
python_callable=run_vacuum_analyze,
dag=dag)
run_vacuum_analyze.set_upstream(check_for_time_to_run_vacuum_analyze_condition)
# Operator : Send Slack when flow completes successfully
#post_completion_to_slack_job = SlackAPIPostOperator(
# task_id='post_completion_to_slack_job',
# token=slack_api_token,
# channel=ep_ops_slack_channel_name,
# text='{{dag.dag_id}} on {{ ti.hostname }} completed successfully for `{{execution_date}}`',
# icon_url='http://airbnb.io/img/projects/airflow3.png',
# dag=dag)
#post_completion_to_slack_job.set_upstream(send_email_notification_flow_successful)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment