Skip to content

Instantly share code, notes, and snippets.

@r39132
Created April 29, 2016 18:07
Show Gist options
  • Save r39132/0d537711b5b9c554c51e854775a15855 to your computer and use it in GitHub Desktop.
Save r39132/0d537711b5b9c554c51e854775a15855 to your computer and use it in GitHub Desktop.
from airflow.models import Variable
from boto.s3.key import Key
from collections import OrderedDict
from datetime import date, datetime, time, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from slackclient import SlackClient
from subprocess import Popen, PIPE, STDOUT
import base64
import boto
import boto.s3
import boto.sqs
import boto.sns
import itertools
import json
import logging
import logging.handlers
import os
import pprint
import psycopg2
import random
import re
import requests
import shutil
import smtplib
import sys
import tempfile
import time
import urllib
import uuid
# Set up logging
logger = logging.getLogger('ep_telemetry_pipeline_utils')
logger.setLevel(logging.DEBUG)
handler = logging.handlers.SysLogHandler(address='/dev/log')
logger.addHandler(handler)
# Get the environment - Either EP_STAGE or EP_PROD
ENV = Variable.get("ENV")
if not ENV:
ENV = 'Local Machine'
def str_to_bool(s):
return s in ['True', 'true']
def _is_prod():
logger.info("----- ENV : {}".format(ENV))
if ENV == "EP_STAGE":
logger.info("----- ENV: is NOT PROD ")
return False
else:
logger.info("----- ENV: is PROD ")
return True
# LOAD VARIABLES
import_sqs_queue_name=Variable.get('ep_pipeline_import_sqs_queue_name').strip()
import_ep_pipeline_alerting_queue_name=Variable.get('ep_pipeline_alerting_queue_name').strip()
region = Variable.get('region').strip()
victorops_key=Variable.get('victorops_key').strip()
ep_ops_slack_channel_name=Variable.get('ep_ops_slack_channel_name').strip()
slack_api_token=Variable.get('slack_api_token').strip()
import_sns_topic_arn_for_job_status=Variable.get('ep_pipeline_sns_topic_for_job_status').strip()
import_ep_db_connect_string =Variable.get('ep_pipeline_db_connect_string').strip()
import_ep_pipeline_alert_email_dl = Variable.get("ep_pipeline_alert_email_dl").strip()
import_ep_pipeline_success_email_dl = Variable.get("ep_pipeline_success_email_dl").strip()
import_ep_pipeline_success_email_from = Variable.get("email_from").strip()
import_ep_pipeline_mandrill_creds_dict = json.loads(Variable.get("ep_pipeline_mandrill_creds").strip())
import_spark_cluster_ip=Variable.get('ep_pipeline_spark_cluster_ip').strip()
import_ep_pipeline_model_build_spark_cluster_ip=Variable.get('ep_pipeline_model_build_spark_cluster_ip').strip()
import_airflow_agg_bucket_name=Variable.get('ep_pipeline_agg_bucket_name').strip()
import_airflow_enable_notifications=str_to_bool(Variable.get('ep_pipeline_enable_notifications').strip())
import_airflow_priority_weight=int(Variable.get('ep_pipeline_priority_weight').strip())
import_airflow_importer_failures_bucket_name=Variable.get('ep_pipeline_importer_failures_bucket_name').strip()
import_airflow_importer_metadata_bucket_name=Variable.get('ep_pipeline_importer_metadata_bucket_name').strip()
import_airflow_s3_collector_ingest_bucket_name=Variable.get('ep_pipeline_s3_collector_ingest_bucket_name').strip()
import_airflow_granularity_secs = int(Variable.get('ep_pipeline_granularity_secs').strip())
import_airflow_collector_ingest_delay_secs = int(Variable.get('ep_pipeline_collector_ingest_delay_secs').strip())
import_ep_pipeline_model_build_in_hourly = str_to_bool(Variable.get('ep_pipeline_model_build_in_hourly').strip())
import_ep_terminate_emr_cluster = str_to_bool(Variable.get('ep_terminate_emr_cluster').strip())
import_ep_pipeline_victorops_alerting = str_to_bool(Variable.get('ep_pipeline_victorops_alerting').strip())
import_ep_pipeline_aggregation_timeout = float(Variable.get('ep_pipeline_aggregation_timeout').strip())
import_airflow_start_date_as_lookback_days=Variable.get('ep_pipeline_start_date_as_lookback_days').strip()
import_aggregate_lookback_days=Variable.get('ep_pipeline_aggregate_lookback_days').strip()
healthchecks_io_url=Variable.get('healthchecks_io_url').strip()
CLEAR_LOGS = str_to_bool(Variable.get('ep_pipeline_delete_spark_logs').strip())
PLATFORM = Variable.get('ep_platform_telemetry_pipeline').strip().lower()
PLATFORM_VARS = Variable.get('ep_spark_ssh_config', deserialize_json=True)[PLATFORM]
SSH_KEY = Variable.get('ep_platform_ssh_keys', deserialize_json=True)[PLATFORM]
SLAVES = [ slave.strip() for slave in PLATFORM_VARS['slaves'] ]
import_ep_pipeline_discrepancy_alerting_config = Variable.get("ep_pipeline_discrepancy_alerting_config", deserialize_json=True)
import_ep_ops_slack_alerting_enabled = str_to_bool(Variable.get("ep_ops_slack_alerting_enabled").strip())
import_ep_pipeline_vacuum_analyze_scheduled_hour=int(Variable.get('ep_pipeline_vacuum_analyze_scheduled_hour').strip())
pp = pprint.PrettyPrinter(indent=4)
# DEFINE CONSTANTS
ISO_8601_DATE_FORMAT = '%Y-%m-%d'
ISO_8601_DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
MAX_NUM_OF_DLQ_MESSAGES_TO_DISPLAY = 5
MODEL_BUILDING_STALENESS_CRITERIA_SECONDS = 24 * 3600
AIRFLOW_INPUT_DATE_FORMAT = ISO_8601_DATE_TIME_FORMAT
#AIRFLOW_INPUT_DATE_FORMAT = '%Y-%m-%d'
COLLECTOR_VALIDATE_DATE_FORMAT = '%Y%m%d_%H'
SPARK_DATE_FORMAT = '%Y%m%d_%H0000'
_MANDRILL_API_ENDPOINT = 'smtp.mandrillapp.com'
_SLEEP_INTERVAL_SECONDS = 10
_MAX_WAIT_TIME_SECONDS_FOR_NEW_SQS_MESSAGE = _SLEEP_INTERVAL_SECONDS * 30
VACUUM_ANALYZE_RIA_QUERY= '''VACUUM ANALYZE VERBOSE receiver_ip_aggregate;'''
VACUUM_ANALYZE_RDA_QUERY= '''VACUUM ANALYZE VERBOSE receiver_domain_aggregate;'''
VACUUM_ANALYZE_MSG_QUERY= '''VACUUM ANALYZE VERBOSE message;'''
VACUUM_ANALYZE_MSG_AGG_QUERY= '''VACUUM ANALYZE VERBOSE message_aggregate;'''
VACUUM_ANALYZE_DISC_METRICS_QUERY= '''VACUUM ANALYZE VERBOSE telemetry_data_discrepancy;'''
DELETE_RIA_QUERY = '''delete from receiver_ip_aggregate ria where ts >= '%s' and ts < '%s';'''
DELETE_M_QUERY = '''delete from message where ts >= '%s' and ts < '%s';'''
DELETE_RDA_QUERY = '''delete from receiver_domain_aggregate where ts >= '%s' and ts < '%s';'''
DELETE_MESSAGE_AGG_QUERY = '''DELETE FROM message_aggregate WHERE ts >= '%s' AND ts < '%s';'''
DELETE_TELEMETRY_DATA_DISC_QUERY = '''DELETE FROM telemetry_data_discrepancy WHERE execution_date >= '%s' AND execution_date < '%s';'''
_MESSAGE_COUNT_QUERY="SELECT count(*) from message where ts >= '%s' and ts < '%s';"
ALL_ORGS_QUERY = '''SELECT id from organization WHERE not deleted AND ingest_enabled AND (expires_at IS NULL OR expires_at > now());'''
GET_ORGS_INGEST_QUERY = '''SELECT id, ingest_enabled from organization WHERE not deleted AND (expires_at IS NULL OR expires_at > now());'''
# RDA/Message by Org and count
_MESSAGE_ORG_HISTO_QUERY='''SELECT organization_id "org", count(*) from message where ts >= '%s' and ts < '%s' group by org order by org;'''
_NRT_MESSAGE_ORG_HISTO_QUERY="SELECT organization_id \"org\", count(*) FROM nrt_message WHERE ts >= '%s' AND ts < '%s' GROUP BY org ORDER BY org"
_RDA_ORG_HISTO_QUERY='''SELECT organization_id "org", count(*) from receiver_domain_aggregate where ts >= '%s' and ts < '%s' group by org order by org;'''
# RDA/Message by day and count
_MESSAGE_HISTO_QUERY='''SELECT date_trunc('day', ts) "day", count(*) from message where ts >= '%s' and ts < '%s' group by day order by day desc;'''
_NRT_MESSAGE_HISTO_QUERY='''SELECT date_trunc('day', ts) "day", count(*) FROM nrt_message WHERE ts > '%s' AND ts < '%s' GROUP BY day ORDER BY day DESC'''
_RDA_HISTO_QUERY='''SELECT date_trunc('day', ts) "day", count(*) from receiver_domain_aggregate where ts >= '%s' and ts < '%s' group by day order by day desc;'''
_EYE_CANDY_LIST = ['http://vignette3.wikia.nocookie.net/looneytunes/images/4/47/Speedy_Gonzales.jpg/revision/latest?cb=20060220031648',
'https://s-media-cache-ak0.pinimg.com/originals/99/38/75/9938756c5636458f6bb553291274de65.jpg',
'https://s-media-cache-ak0.pinimg.com/736x/1e/a1/f1/1ea1f101d9d562ef428211eb53233cae.jpg',
'http://foghornleghornquotes.com/wp-content/uploads/Foghorn-on-the-Farm.jpg',
'http://images6.fanpop.com/image/photos/33200000/marvin-the-martian-looney-tunes-33226447-403-373.jpg',
'http://img.veehd.com/3154746_l2.jpg',
'http://thelivingstonpost.com/wp-content/uploads/2012/10/wile_e_coyote_adhesive_vinyl_decal_sticker5__59675.jpg',
'http://i.ytimg.com/vi/VEFmFMeXV3E/hqdefault.jpg',
'http://m.cdn.blog.hu/vi/vilagevo/image/nyc/brklyn/news_photo_56969_1409171882.jpg',
'http://www.sitcomsonline.com/photopost/data/3429/1foghorn_leghorn-5230.jpg',
'https://d1u1p2xjjiahg3.cloudfront.net/53ead210-1e5b-41b9-bfde-456bdd021e49.jpg',
'http://randomville.com/Image/music/music_features/MK1goodman04_RabbitOfSeville.jpg',
'http://media.animevice.com/uploads/1/11232/634659-287054_72477_bugs_bunny_super_super.jpg',
'https://s-media-cache-ak0.pinimg.com/originals/82/09/66/8209663a65616013a280317df0c216c2.gif',
'http://www.cbc.ca/allinaday/images/littlemermaid.jpg',
]
SPARK_CLUSTER_URI = "spark://{}:7077".format(import_spark_cluster_ip)
# FUNCTIONS
def compute_end_dt(start_dt, task_instance):
"""
Computes end_dt from start_dt
"""
return start_dt+timedelta(seconds=import_airflow_granularity_secs)
def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
logger.info('Executing SLA miss callback')
message_type = 'info'
if ENV == 'EP_PROD' and import_ep_pipeline_victorops_alerting:
message_type = 'critical'
# Execute VictorOps alerting
message_name = "{env} - {dag_name} SLA Miss for task `{task_list}`".format(**{
'dag_name': dag.dag_id,
'task_list': task_list,
'env': ENV,
})
userdata ={"message_type": message_type,
"state_message": message_name,
"monitoring_tool": "airflow",
"entity_id":"airflow/telemetry/SLA_miss"}
requests.post('https://alert.victorops.com/integrations/generic/20131114/alert/{}/EP'.format(victorops_key), data=json.dumps(userdata))
# Execute Slack alerting
report_issues_on_slack_util(message_name)
def wait_for_collector_ingest(ds, **kwargs):
# Sleep unless this is being run after the first n mins of the hour
cur_minute_of_hour = datetime.now().time().minute
cur_second_of_minute = datetime.now().time().second
cur_second_of_hour = (cur_minute_of_hour * 60) + cur_second_of_minute
if cur_second_of_hour < import_airflow_collector_ingest_delay_secs:
# Sleep the remainder of time
remainder_secs = import_airflow_collector_ingest_delay_secs - cur_second_of_hour
logger.info("wait_for_collector_ingest : About to wait {} seconds for collector ingest to complete since cur_second_of_hour={}".format(remainder_secs,
cur_second_of_hour))
time.sleep(import_airflow_collector_ingest_delay_secs)
logger.info("wait_for_collector_ingest : Finished waiting {} seconds for collector ingest to complete".format(remainder_secs))
def ping_healthchecks_io(ds, **kwargs):
if healthchecks_io_url:
requests.get(healthchecks_io_url)
# Purge DLQ if it has items on it
def purge_DLQ(ds, **kwargs):
import_sqs_dlq_name = "{}-{}".format(import_sqs_queue_name, "DLQ")
was_successful = purge_sqs_queue(import_sqs_dlq_name)
return was_successful
# Check for time to run vacuum analyze
def check_for_time_to_run_vacuum_analyze_condition(ds, **kwargs):
'''
Check whether it is time to run vacuum analyze
'''
dt_hour = kwargs['execution_date'].time().hour
if dt_hour == import_ep_pipeline_vacuum_analyze_scheduled_hour:
return True
else:
return False
# Run vacuum analyze
def run_vacuum_analyze(ds, **kwargs):
'''
Run vacuum analyze
'''
# Set the connection string based on the environment
db_conn_string = import_ep_db_connect_string
db_conn = psycopg2.connect(db_conn_string)
logger.info("----- Successfully Connected to database {}".format(db_conn_string))
cursor = db_conn.cursor()
run_vacuum_analyze_utils(db_conn, cursor)
# close the cursor and connection
cursor.close()
db_conn.close()
return True
# Build models only if the nth hour!
def check_for_time_to_build_model_branch_condition(ds, **kwargs):
'''
Check whether it is time to build models
'''
if not import_ep_pipeline_model_build_in_hourly:
return 'prejoin_preagg_dummy_job'
dt_hour = kwargs['execution_date'].time().hour
if dt_hour == 6:
return 'build_sender_models_spark_job'
else:
return 'prejoin_preagg_dummy_job'
# Check model building is successful
def check_for_successful_model_building_branch_condition(ds, **kwargs):
'''
Check model building is successful
'''
successful = verify_model_building_successful()
if successful:
return 'delete_db_data'
else:
return 'send_sns_notification_model_building_failed'
def send_sns_notification_model_building_failed(ds, **kwargs):
# Return immediately if email is disabled
if not import_airflow_enable_notifications:
return
conn = boto.sns.connect_to_region(region)
message = 'Airflow EP Data pipeline flow completed with failed model building {} environment for: {} GMT/UTC'.format(ENV,
kwargs['execution_date'].strftime(AIRFLOW_INPUT_DATE_FORMAT))
conn.publish(message=message, subject='Airflow Flow Complete with failed model building', target_arn=import_sns_topic_arn_for_job_status,
)
logger.info("----- Sent SNS about FLOW Completion : model building failure")
return True
# Verify model building is successful
def verify_model_building_successful():
return verify_model_building_successful_util(region,
import_airflow_importer_metadata_bucket_name)
# Verify model building is successful
def verify_model_building_successful_util(aws_region, metadata_bucket_name ):
'''
Check for the presence of certain files to ensure that model building is successful
e.g. Under appropriate metadata bucket, check
* activereputation/reputation-{epoch time}.json
* models/sendermodels-*.avro
* cousin_domains/cousin-scores.json (optional)
'''
conn = boto.s3.connect_to_region(aws_region)
bucket = conn.get_bucket(metadata_bucket_name)
now_epoch_time = int(time.time())
# Check the activereputation/reputation-{epoch time}.json
bucket_list = bucket.list(prefix='activereputation/reputation')
# TODO : This won't be acceptable once the list of keys grows into the tens of thousands
ordered_list = sorted(bucket_list, key=lambda k: k.last_modified, reverse=True)
most_recent_key = ordered_list[0]
epoch_search = re.search('activereputation/reputation-(.*).json', most_recent_key.key, re.IGNORECASE)
common_log_message = '----- verify_model_building_successful : check for recent activereputation/reputation file:'
if epoch_search:
rep_epoch_time = float(epoch_search.group(1))
# don't run model building more than once a day
if rep_epoch_time < (now_epoch_time - MODEL_BUILDING_STALENESS_CRITERIA_SECONDS):
logger.info("{} {} Failed".format(common_log_message, most_recent_key.key))
return False
else:
logger.info("{} {} Succeeded".format(common_log_message, most_recent_key.key))
# Check the models/sendermodels-*.avro
bucket_list = bucket.list(prefix='models/sendermodels')
# TODO : This won't be acceptable once the list of keys grows into the tens of thousands
ordered_list = sorted(bucket_list, key=lambda k: k.last_modified, reverse=True)
most_recent_key = ordered_list[0]
epoch_search = re.search('models/sendermodels-(\d*\.\d*)\D*', most_recent_key.key, re.IGNORECASE)
common_log_message = '----- verify_model_building_successful : check for recent models/sendermodels file:'
if epoch_search:
sm_epoch_time = float(epoch_search.group(1))
if sm_epoch_time < (now_epoch_time - MODEL_BUILDING_STALENESS_CRITERIA_SECONDS):
logger.info("{} {} Failed".format(common_log_message, most_recent_key.key))
return False
else:
logger.info("{} {} Succeeded".format(common_log_message, most_recent_key.key))
# Check the cousin_domains/cousin-scores.json
# time is returned as 'Mon, 02 Nov 2015 23:33:55 GMT'
cd_lmd = bucket.get_key('cousin_domains/cousin_scores.json').last_modified
lmd = datetime.strptime(cd_lmd, '%a, %d %b %Y %H:%M:%S %Z')
cd_epoch = lmd.strftime('%s')
now_epoch_time = int(time.time())
common_log_message = '----- verify_model_building_successful : check for recent cousin_domains/cousin_scores.json file:'
if cd_epoch < (now_epoch_time - MODEL_BUILDING_STALENESS_CRITERIA_SECONDS):
logger.info("{} Failed. LMD = {}".format(common_log_message, cd_lmd))
return False
else:
logger.info("{} Succeeded. LMD = {}".format(common_log_message, cd_lmd))
# If we get here, model building is successful
return True
# Delete RDA, RIA, and Message records in the table for the target date range
def delete_db_data(ds, **kwargs):
'''
Delete data from the RDA, RIA, and Message tables
'''
# Format and compute dates for the time bounds
start_dt, end_dt = get_start_and_end_datetime(**kwargs)
# Delete data
delete_db_data_util(import_ep_db_connect_string,
start_dt,
end_dt)
return True
def delete_db_discrepancy_data(db_conn_string, execution_date):
'''
Delete data discrepancy
'''
# Get start and end dates
start_date, end_date = get_start_and_end_datetime(execution_date=execution_date)
# Set the connection string based on the environment
db_conn = psycopg2.connect(db_conn_string)
logger.info("----- Successfully Connected to database {}".format(db_conn_string))
cursor = db_conn.cursor()
DELETE_TELEMETRY_DATA_DISC_QUERY_BOUND = DELETE_TELEMETRY_DATA_DISC_QUERY % (start_date, end_date)
# Execute queries
logger.info("----- Executing the following query against the db : {}".format(DELETE_TELEMETRY_DATA_DISC_QUERY_BOUND))
cursor.execute(DELETE_TELEMETRY_DATA_DISC_QUERY_BOUND)
db_conn.commit() # close the transaction
# Always vacuum analyze
db_conn.autocommit = True
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_DISC_METRICS_QUERY))
cursor.execute(VACUUM_ANALYZE_DISC_METRICS_QUERY)
db_conn.autocommit = False
# close the cursor and connection
cursor.close()
db_conn.close()
return True
# Utility method to delete RDA, RIA, and Message records in the table for the target date range
def delete_db_data_util(db_conn_string, start_date, end_date, vacuum_analyze=False, reload_data=False):
'''
Delete data from the RDA, RIA, and Message tables
'''
# Set the connection string based on the environment
db_conn = psycopg2.connect(db_conn_string)
logger.info("----- Successfully Connected to database {}".format(db_conn_string))
cursor = db_conn.cursor()
DELETE_RIA_QUERY_BOUND = DELETE_RIA_QUERY % (start_date, end_date)
DELETE_M_QUERY_BOUND = DELETE_M_QUERY % (start_date, end_date)
DELETE_RDA_QUERY_BOUND = DELETE_RDA_QUERY % (start_date, end_date)
# Execute queries
logger.info("----- Executing the following query against the db : {}".format(DELETE_RIA_QUERY_BOUND))
cursor.execute(DELETE_RIA_QUERY_BOUND)
logger.info("----- Executing the following query against the db : {}".format(DELETE_M_QUERY_BOUND))
cursor.execute(DELETE_M_QUERY_BOUND)
logger.info("----- Executing the following query against the db : {}".format(DELETE_RDA_QUERY_BOUND))
cursor.execute(DELETE_RDA_QUERY_BOUND)
db_conn.commit() # close the transaction
# Do optional Vacuum Analyze
if vacuum_analyze:
run_vacuum_analyze_utils(db_conn, cursor)
# close the cursor and connection
cursor.close()
db_conn.close()
return True
# Function to execute vacuum analyze
def run_vacuum_analyze_utils(db_conn, cursor):
db_conn.autocommit = True
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_RDA_QUERY))
cursor.execute(VACUUM_ANALYZE_RDA_QUERY)
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_RIA_QUERY))
cursor.execute(VACUUM_ANALYZE_RIA_QUERY)
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_MSG_QUERY))
cursor.execute(VACUUM_ANALYZE_MSG_QUERY)
logger.info("----- Executing the following query against the db : {}".format(VACUUM_ANALYZE_MSG_AGG_QUERY))
cursor.execute(VACUUM_ANALYZE_MSG_AGG_QUERY)
db_conn.autocommit = False
def check_for_sqs_message_branch_condition(ds, **kwargs):
conn = boto.sqs.connect_to_region(region)
logger.info("----- Connecting to SQS Queue : {}".format(import_sqs_queue_name))
q = conn.get_queue(import_sqs_queue_name)
rs = q.get_messages()
found_message = True
start_time = datetime.now()
while rs is None or len(rs) <= 0:
elapsed_time_seconds = (datetime.now() - start_time).seconds
if elapsed_time_seconds >= _MAX_WAIT_TIME_SECONDS_FOR_NEW_SQS_MESSAGE:
found_message = False
break;
logger.info("----- WAITING FOR A MESSAGE on SQS Queue : {}".format(import_sqs_queue_name))
time.sleep(_SLEEP_INTERVAL_SECONDS)
rs = q.get_messages()
if found_message:
logger.info("----- FOUND A MESSAGE on SQS Queue : {}".format(import_sqs_queue_name))
return 'wait_for_new_data_in_db'
else:
logger.info("----- GAVE UP ON WAITING FOR A MESSAGE SQS Queue : {}".format(import_sqs_queue_name))
return 'send_sns_notification_no_spark_data'
def wait_for_empty_queue(ds, **kwargs):
conn = boto.sqs.connect_to_region(region)
q = conn.get_queue(import_sqs_queue_name)
q_size = q.count()
while q_size > 0:
logger.info("----- WAITING FOR Queue {} to Drain : Messages remaining : {}".format(import_sqs_queue_name, q_size))
q = conn.get_queue(import_sqs_queue_name)
q_size = q.count()
time.sleep(_SLEEP_INTERVAL_SECONDS)
# Once the queue drains, we still have many messages in flight, especially if we have 20 ASG instances launched!
attr = q.get_attributes()
inflight_message_count = int(attr['ApproximateNumberOfMessagesNotVisible'])
while inflight_message_count > 0:
logger.info("----- WAITING FOR Queue {} to Drain : In-flight Messages remaining : {}".format(import_sqs_queue_name,
inflight_message_count))
q = conn.get_queue(import_sqs_queue_name)
attr = q.get_attributes()
inflight_message_count = int(attr['ApproximateNumberOfMessagesNotVisible'])
time.sleep(_SLEEP_INTERVAL_SECONDS)
logger.info("----- Queue {} Empty".format(import_sqs_queue_name))
return True
# Simplied version of the db data wait method that
# only checks that the row count is >0
def wait_for_new_data_in_db_simple(ds, **kwargs):
db_conn = psycopg2.connect(import_ep_db_connect_string)
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string))
cursor = db_conn.cursor()
# Establish a base line
start_dt, end_dt = get_start_and_end_datetime(**kwargs)
_MESSAGE_COUNT_QUERY_WITH_TS = _MESSAGE_COUNT_QUERY % (start_dt, end_dt)
logger.info("----- Executing the following query against the db : {}".format(_MESSAGE_COUNT_QUERY_WITH_TS))
cursor.execute(_MESSAGE_COUNT_QUERY_WITH_TS)
result = cursor.fetchone()
initial_row_count = int(result[0])
db_conn.commit() #close the transaction
logger.info("----- FOUND A NEW DATA in DB (Simplified) : (row count) = {}".format(initial_row_count))
# close the cursor and connection
cursor.close()
db_conn.close()
return initial_row_count>0
# Waits for new data in the db and expect to observe data being added
def wait_for_new_data_in_db(ds, **kwargs):
db_conn = psycopg2.connect(import_ep_db_connect_string)
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string))
cursor = db_conn.cursor()
# Establish a base line
start_dt, end_dt = get_start_and_end_datetime(**kwargs)
_MESSAGE_COUNT_QUERY_WITH_TS = _MESSAGE_COUNT_QUERY % (start_dt, end_dt)
logger.info("----- Executing the following query against the db : {}".format(_MESSAGE_COUNT_QUERY_WITH_TS))
cursor.execute(_MESSAGE_COUNT_QUERY_WITH_TS)
result = cursor.fetchone()
initial_row_count = int(result[0])
db_conn.commit() #close the transaction
found_new_data = None
# Detect a change in the record count
while not found_new_data:
logger.info("----- WAITING FOR A NEW DATA in DB : base count = {}".format(initial_row_count))
time.sleep(_SLEEP_INTERVAL_SECONDS)
cursor.execute(_MESSAGE_COUNT_QUERY_WITH_TS)
result = cursor.fetchone()
new_row_count = int(result[0])
if(new_row_count > initial_row_count):
found_new_data = True
db_conn.commit() #close the transaction
logger.info("----- FOUND A NEW DATA in DB : (base count ==> new count) = ({} ==> {})".format(initial_row_count, new_row_count))
# close the cursor and connection
cursor.close()
db_conn.close()
return True
def send_sns_notification_no_spark_data(ds, **kwargs):
# Return immediately if email is disabled
if not import_airflow_enable_notifications:
return
date_format = AIRFLOW_INPUT_DATE_FORMAT
conn = boto.sns.connect_to_region(region)
message = 'Airflow EP Data pipeline flow completed with no new Spark data in the {} environment for : {} GMT/UTC'.format(ENV,
kwargs['execution_date'].strftime(date_format))
conn.publish(message=message, subject='Airflow Flow Complete with No New Data', target_arn=import_sns_topic_arn_for_job_status,
)
logger.info("----- Sent SNS about FLOW Completion : no new spark data")
return True
def get_record_org_histos_for_tables(execution_date):
# Establish a base line
start_dt, end_dt = get_start_and_end_datetime(execution_date=execution_date)
date_format = AIRFLOW_INPUT_DATE_FORMAT
start_date_string = start_dt.strftime(date_format)
end_date_string = end_dt.strftime(date_format)
_RDA_ORG_HISTO_QUERY_WITH_TS = _RDA_ORG_HISTO_QUERY % (start_date_string, end_date_string)
_MESSAGE_ORG_HISTO_QUERY_WITH_TS = _MESSAGE_ORG_HISTO_QUERY % (start_date_string, end_date_string)
_NRT_MESSAGE_ORG_HISTO_QUERY_WITH_TS = _NRT_MESSAGE_ORG_HISTO_QUERY % (start_date_string, end_date_string)
logger.info("----- Executing the following queries against the db : {} and {} and {}".format(_RDA_ORG_HISTO_QUERY_WITH_TS,
_MESSAGE_ORG_HISTO_QUERY_WITH_TS,
_NRT_MESSAGE_ORG_HISTO_QUERY_WITH_TS))
return get_record_histos_for_tables(_RDA_ORG_HISTO_QUERY_WITH_TS,
'RDA_ORG_HISTO',
_MESSAGE_ORG_HISTO_QUERY_WITH_TS,
'MESSAGE_ORG_HISTO',
_NRT_MESSAGE_ORG_HISTO_QUERY_WITH_TS,
'NRT_MESSAGE_ORG_HISTO')
def get_record_date_histos_for_tables(start_dt, end_dt, reload_data=False):
if reload_data:
# For queries, we need to cap at 1 day above with "<" condition
end_date = end_dt + timedelta(days=1)
else:
# For queries, we need to cap at 1 hour above with "<" condition
end_date = end_dt + timedelta(hours=1)
message_query = _MESSAGE_HISTO_QUERY % (start_dt, end_date)
rda_query = _RDA_HISTO_QUERY % (start_dt, end_date)
nrt_message_query = _NRT_MESSAGE_HISTO_QUERY % (start_dt, end_date)
return get_record_histos_for_tables(rda_query,
'RDA_HISTO',
message_query,
'MESSAGE_HISTO',
nrt_message_query,
'NRT_MESSAGE_HISTO')
def get_record_histos_for_tables(rda_histo_query,
rda_histo_key_name,
message_histo_query,
message_histo_key_name,
nrt_message_histo_query,
nrt_message_histo_key_name):
'''
This returns RDA/Message histos
'''
db_conn = psycopg2.connect(import_ep_db_connect_string)
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string))
# Open a cursor
cursor = db_conn.cursor()
# Create the queries
logger.info("----- Executing the following queries against the db : {} & {}".format(message_histo_query,
rda_histo_query))
# Execute the queries - first message histo count
cursor.execute(message_histo_query)
message_histo_records = cursor.fetchall()
db_conn.commit() #close the transaction
# Execute the queries - next rda histo count
cursor.execute(rda_histo_query)
rda_histo_records = cursor.fetchall()
db_conn.commit() #close the transaction
# nrt_message histo count
cursor.execute(nrt_message_histo_query)
nrt_message_histo_records = cursor.fetchall()
db_conn.commit()
# close the cursor and connection
cursor.close()
db_conn.close()
return {message_histo_key_name:message_histo_records,
rda_histo_key_name:rda_histo_records,
nrt_message_histo_key_name:nrt_message_histo_records}
# Get collector histos from s3 for validation
def get_collector_histos(**kwargs):
# Get the list of ingest disabled orgs to skip for non-prod envs
ingest_disabled_orgs = set()
if not _is_prod():
orgs_ingest_lookup_pre_run = kwargs['task_instance'].xcom_pull(task_ids='discover_ingest_enabled_orgs')
for key, pre_value in orgs_ingest_lookup_pre_run.iteritems():
if not pre_value:
ingest_disabled_orgs.add(key)
# Get collector file counts
#collector_dict = get_collector_file_message_counts_per_org(ingest_disabled_orgs,
# kwargs['execution_date'])
collector_dict = get_collector_file_message_counts_per_org(kwargs['execution_date'])
converted_dict = {}
for k,v in collector_dict.iteritems():
converted_dict[long(k)] = long(v)
return converted_dict
# Common logic to the agg_avro and agg_avro_skipped histo methods
def agg_avro_histos_common(execution_date, suffix):
conn = boto.s3.connect_to_region(region)
bucket = conn.get_bucket(import_airflow_agg_bucket_name)
# Compute the agg start and end dates using the lookback
# NOTE : params.import_aggregate_lookback_days is non-positive
start_dt, end_dt = get_start_and_end_datetime(execution_date=execution_date)
agg_start_date_epoch = start_dt.strftime(SPARK_DATE_FORMAT)
agg_end_date_epoch = end_dt.strftime(SPARK_DATE_FORMAT)
logger.info('agg_avro_histos_common Stats located at domain_aggregate-{}-{}'.format(agg_start_date_epoch,
agg_end_date_epoch))
rs = bucket.list("domain_aggregate-{}-{}".format(agg_start_date_epoch,
agg_end_date_epoch))
# Since there may be many runs, find the most recent run
most_recent_lmd = None
most_recent_key = None
for key in rs:
if key.key.endswith(suffix):
if not most_recent_lmd or most_recent_lmd < key.last_modified:
most_recent_lmd = key.last_modified
most_recent_key = key
return (most_recent_lmd, most_recent_key)
# Get agg_avro histos from s3 for validation
def get_agg_avro_histos(execution_date):
most_recent_lmd, most_recent_key = agg_avro_histos_common(execution_date,
"summary.json")
summary = None
if most_recent_key:
summary = json.loads(most_recent_key.get_contents_as_string())
# We want { 1: 11534, 6: 87200, 7: 16446 }
org_to_message_count_dict = dict()
if 'aggregated_counts_by_org_id' in summary:
filtered_by_org_id_dict = summary['aggregated_counts_by_org_id']
# At this point, we have {u'12': 0, u'21': 0, u'1': 2713, u'5': 0, u'7': 2181, u'6': 0}
for key in filtered_by_org_id_dict:
org_to_message_count_dict[int(key)] = filtered_by_org_id_dict[key]
# Print out the contents
logger.info('Agg Avro Summary Stats located at %s' % (most_recent_key))
logger.info("Agg Avro Summary Stats : org_to_message_count_dict = %s" % org_to_message_count_dict )
return org_to_message_count_dict
def define_message_alert_job(org_id, start_dt, interval):
job_id = str(uuid.uuid4())
job = {'job_class': 'MessageAlertJob',
'job_id': job_id,
'arguments': [org_id, start_dt, interval]}
return (job_id, json.dumps(job), 0)
# Enqueue a job over SQS to check for Alerts
def enqueue_alerting_jobs(ds, **kwargs):
db_conn = psycopg2.connect(import_ep_db_connect_string)
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string))
cursor = db_conn.cursor()
logger.info("----- Executing the following query against the db : {}".format(ALL_ORGS_QUERY))
cursor.execute(ALL_ORGS_QUERY)
org_ids = [r[0] for r in cursor.fetchall()]
cursor.close()
db_conn.close()
start_dt = kwargs['execution_date'].strftime(ISO_8601_DATE_TIME_FORMAT)
interval = int(import_airflow_granularity_secs)
jobs = map(lambda org: define_message_alert_job(org, start_dt, interval), org_ids)
job_batches = [jobs[i:i + 10] for i in range(0, len(jobs), 10)]
conn = boto.sqs.connect_to_region(region)
logger.info("----- Writing MessageAlertJobs to queue {}".format(import_ep_pipeline_alerting_queue_name))
q = conn.get_queue(import_ep_pipeline_alerting_queue_name)
for batch in job_batches:
logger.info("----- Enqueuing job batch for orgs :{}".format(batch))
status = q.write_batch(batch)
if status.errors:
logger.error("Batch failures {}".format(status.errors))
# Get agg_avro skipped by skip type histos from s3 for validation
def get_agg_avro_skipped_by_type_histos(execution_date):
most_recent_lmd, most_recent_key = agg_avro_histos_common(execution_date,
"summary.json")
# Set the summary contents from the most recent key
summary = None
if most_recent_key:
summary = json.loads(most_recent_key.get_contents_as_string())
# We want { 1: {"internal": 30}, 14: {"internal": 31737, "invalid_hdr_from": 1, "invalid_ip": 2}}
org_to_skip_type_to_count_dict = dict()
if 'skip_type_counts' in summary:
skip_counts_by_org_and_skip_type_dict = summary['skip_type_counts']
# At this point, we have {"14": {"internal": 31737, "invalid_hdr_from": 1, "invalid_ip": 2}}
for key in skip_counts_by_org_and_skip_type_dict:
org_to_skip_type_to_count_dict[int(key)] = skip_counts_by_org_and_skip_type_dict[key]
# Print out the contents
logger.info('Agg Avro Skipped Stats located at %s' % (most_recent_key))
logger.info("Agg Avro Skipped Stats : org_to_skip_type_to_count_dict = %s" % org_to_skip_type_to_count_dict)
return org_to_skip_type_to_count_dict
# Get agg_avro skipped histos from s3 for validation
def get_agg_avro_skipped_histos(execution_date):
most_recent_lmd, most_recent_key = agg_avro_histos_common(execution_date,
"summary.json")
# Set the summary contents from the most recent key
summary = None
if most_recent_key:
summary = json.loads(most_recent_key.get_contents_as_string())
# We want { 1: 11534, 6: 87200, 7: 16446 }
org_to_message_count_dict = dict()
if 'deltas_by_org_id' in summary and 'filtered_by_org_id' in summary['deltas_by_org_id']:
filtered_by_org_id_dict_string = summary['deltas_by_org_id']['filtered_by_org_id']
filtered_by_org_id_dict = filtered_by_org_id_dict_string
# At this point, we have {u'12': 0, u'21': 0, u'1': 2713, u'5': 0, u'7': 2181, u'6': 0}
for key in filtered_by_org_id_dict:
org_to_message_count_dict[int(key)] = filtered_by_org_id_dict[key]
# Print out the contents
logger.info('Agg Avro Skipped Stats located at %s' % (most_recent_key))
logger.info("Agg Avro Skipped Stats : org_to_message_count_dict = %s" % org_to_message_count_dict )
return org_to_message_count_dict
# Collect DLQ Stats and information
def collects_DLQ_stats_simple():
# Get the DLQ Count and return immediately if there are no messages
url_list = []
conn = boto.sqs.connect_to_region(region)
import_sqs_dlq_name = "{}-{}".format(import_sqs_queue_name, "DLQ")
logger.info("----- Checking DLQ named {} for messages".format(import_sqs_dlq_name))
q = conn.get_queue(import_sqs_dlq_name)
dlq_q_size = q.count()
if dlq_q_size == 0:
return (dlq_q_size, url_list)
logger.info("----- Found DLQs dlq_count: {} messages on DLQ named {}".format(dlq_q_size,
import_sqs_dlq_name))
# Get a single message and figure out the folder path leading to the object
rs = q.get_messages(num_messages=1)
if len(rs) > 0:
m = rs[0]
body = json.loads(m.get_body())
message_internal = json.loads(body["Message"])
records = message_internal["Records"]
# Grab the first record
if len(records) > 0:
record = records[0]
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
logger.info("collects_DLQ_stats : bucket={} and key={}".format(bucket, key))
# Remove the trailing '/' if it exists
if key.endswith('/'):
key = key[:-1]
new_key = os.path.dirname(key)
url = 'https://console.aws.amazon.com/s3/home?region={}&bucket={}&prefix={}'.format(region,
import_airflow_importer_failures_bucket_name,
new_key)
url_list.append(url)
logger.info("collects_DLQ_stats : URL LIST = %s" % (url_list))
return (dlq_q_size, url_list)
# Collect stats for validation
def collects_stats_for_validation(**kwargs):
'''
Report data discrepancies at each of the stages of the EP data pipeline:
* Stage 1 : Customer --> Collector files
* Stage 2 : Collector files --> run_aggr --> agg files
* Stage 3 : agg files --> importer --> DB
For each data source, we want org:{rda_count, message count}, though the first iteration
might just contain message counts.
'''
# Pre-Stage 1 : Check Collector input counts
collector_org_counts_dict = get_collector_histos(**kwargs)
# Post-Stage 2 : Check Agg counts and skipped counts
agg_avro_org_counts_dict = get_agg_avro_histos(kwargs['execution_date'])
agg_avro_org_skipped_counts_dict = get_agg_avro_skipped_histos(kwargs['execution_date'])
agg_avro_org_to_skip_type_to_count_dict = get_agg_avro_skipped_by_type_histos(kwargs['execution_date'])
# Post-Stage 3 : Check DB counts
db_org_counts_dict = get_record_org_histos_for_tables(kwargs['execution_date'])
return {'collector_org_counts_dict':collector_org_counts_dict,
'db_org_counts_dict':db_org_counts_dict,
'agg_avro_org_counts_dict':agg_avro_org_counts_dict,
'agg_avro_org_skipped_counts_dict':agg_avro_org_skipped_counts_dict,
'agg_avro_org_to_skip_type_to_count_dict':agg_avro_org_to_skip_type_to_count_dict
}
# Validate the pipeline stages
def validate_stages(**kwargs):
# Create a date object from this String
date_format = '%Y-%m-%d'
# Validate the stages
return validate_stages_util(**kwargs)
# get a comma-delimited string of org_ids to send to spark jobs
def get_active_org_ids_string():
org_dict = get_org_ingest_dict()
org_ids = sorted([ int(id_) for id_ in org_dict.keys() if org_dict[id_] ]) # sorted for easier debugging
return ','.join([ str(id_) for id_ in org_ids ])
def get_org_ingest_dict():
db_conn = psycopg2.connect(import_ep_db_connect_string)
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string))
cursor = db_conn.cursor()
logger.info("----- Executing the following query against the db : {}".format(GET_ORGS_INGEST_QUERY))
cursor.execute(GET_ORGS_INGEST_QUERY)
result = dict()
for r in cursor.fetchall():
result[r[0]] = r[1]
cursor.close()
db_conn.close()
return result
def validate_stages_util(**kwargs):
stages_org_counts_dict = collects_stats_for_validation(**kwargs)
pp.pprint(stages_org_counts_dict)
# We start with list of tuples (org_id, message_count): one for db
collector_org_message_count_dict = stages_org_counts_dict['collector_org_counts_dict']
db_org_message_count_tuples = stages_org_counts_dict['db_org_counts_dict']['MESSAGE_ORG_HISTO']
db_org_nrt_message_count_tuples = stages_org_counts_dict['db_org_counts_dict']['NRT_MESSAGE_ORG_HISTO']
agg_avro_org_counts_dict = stages_org_counts_dict['agg_avro_org_counts_dict']
agg_avro_org_skipped_counts_dict = stages_org_counts_dict['agg_avro_org_skipped_counts_dict']
agg_avro_org_to_skip_type_to_count_dict = stages_org_counts_dict['agg_avro_org_to_skip_type_to_count_dict']
# Make a set of the different types of skips seen. These will be additional columns per org.
# This set of columns is dynamic because we want to support anything returned by the
# aggregation/scoring code
skip_types_seen = set()
for org_id, skip_type_and_count_dict in agg_avro_org_to_skip_type_to_count_dict.items():
for skip_type, count in skip_type_and_count_dict.items():
skip_types_seen.add(skip_type)
logger.info("skip_types_seen = {}".format(skip_types_seen))
# Note : I won't be providing 0-value defaults for the skip type dicts
# There may be elements in the upstream lists that are not in the downstream lists.
# Assumption : Upstream lists are more complete in terms of org representation.
# Downstream bugs can cause duplicate counting for an org, but they won't cause an org
# to be fabricated from thin air
# Create dicts, where the key is org_id, value is message counts
db_message_counts_keyed_by_org_dict = dict(db_org_message_count_tuples)
db_nrt_message_counts_keyed_by_org_dict = dict(db_org_nrt_message_count_tuples)
# Look up the pre and post-agg run orgs to determine which one were
# ingest-enabled for the entire run. For those, we will compute discrepancies
orgs_ingest_lookup_post_run = get_org_ingest_dict()
orgs_ingest_lookup = get_finalized_list_of_ingest_enabled_orgs(orgs_ingest_lookup_post_run, **kwargs)
# Create a dict of tuples called message_stats_by_stage. The key is org_id, the value is a tuple
# where each member represents counts for a stage
message_stats_by_stage = dict()
for col_org_id, col_msg_cnt in collector_org_message_count_dict.iteritems():
# If an org is present in the DB, use its count, else use 0
db_msg_cnt = 0
if col_org_id in db_message_counts_keyed_by_org_dict:
db_msg_cnt = db_message_counts_keyed_by_org_dict[col_org_id]
db_nrt_msg_cnt = 0
if col_org_id in db_nrt_message_counts_keyed_by_org_dict:
db_nrt_msg_cnt = db_nrt_message_counts_keyed_by_org_dict[col_org_id]
# If an org is present in the agg_avro, use its count, else use 0
agg_avro_msg_cnt = 0
if col_org_id in agg_avro_org_counts_dict:
agg_avro_msg_cnt = agg_avro_org_counts_dict[col_org_id]
# If an org is present in the agg_avro_skipped, use its count, else use 0
agg_avro_msg_skipped_cnt = 0
if col_org_id in agg_avro_org_skipped_counts_dict:
agg_avro_msg_skipped_cnt = agg_avro_org_skipped_counts_dict[col_org_id]
# Put all results into a tuple
message_stats_by_stage[col_org_id] = (col_msg_cnt,
agg_avro_msg_cnt,
db_msg_cnt,
agg_avro_msg_skipped_cnt,
skip_types_seen,
agg_avro_org_to_skip_type_to_count_dict,
db_nrt_msg_cnt)
# Sort Dictionary by key
message_stats_by_stage_ord_by_org = OrderedDict(sorted(message_stats_by_stage.items()))
# Print out some info to logging
for org_id, message_counts in message_stats_by_stage_ord_by_org.iteritems():
logger.info("ORG:{} ==> Collector Count: {}, Agg Avro Count: {}, DB Count: {}, Agg Avro Skipped: {}" \
.format(str(org_id),
str(message_counts[0]),
str(message_counts[1]),
str(message_counts[2]),
str(message_counts[3])))
# If the collector and db messages counts don't agree
if message_counts[0] != message_counts[3]:
discrepancy = message_counts[0] - message_counts[2] - message_counts[3]
discrepancy_pct = perc(discrepancy, message_counts[0])
if orgs_ingest_lookup[org_id]:
le_alert_dict = { 'org_id': org_id, 'collector_count': message_counts[0], 'db_count': message_counts[2], 'discrepancy_count': discrepancy, 'discrepancy_pct': discrepancy_pct }
alert_str = "Telemetry pipeline discrepancy - " + json.dumps(le_alert_dict)
logger.warning(alert_str)
return message_stats_by_stage_ord_by_org
# This method combines pre-run and post-run views of the ingest-enabled orgs
def get_finalized_list_of_ingest_enabled_orgs(orgs_ingest_lookup_post_run, **kwargs):
'''
For an org to included in the finalized list, it needs to have been enabled for ingest
during both pre- and post-run. We really mean pre- and post-aggregation run here!
'''
orgs_ingest_lookup_pre_run = kwargs['task_instance'].xcom_pull(task_ids='discover_ingest_enabled_orgs')
finalized_dict = dict()
for key, pre_value in orgs_ingest_lookup_pre_run.iteritems():
# Handle case where an org expires or is deleted since the pre_run lookup. We don't want to get a key error
# on orgs_ingest_lookup_post_run[key]
post_value = False
if key in orgs_ingest_lookup_post_run:
post_value = orgs_ingest_lookup_post_run[key]
logger.info("Ingest Enabled Orgs : key={}, pre_value={}, post_value={}".format(key,
pre_value,
post_value))
if pre_value and post_value:
finalized_dict[key] = True
else:
finalized_dict[key] = False
return finalized_dict
def generate_successful_email_util(start_dt, reload_end_dt, **kwargs):
logger.info("Starting to generate success email")
# Return immediately if email is disabled
if not import_airflow_enable_notifications:
logger.info("Skipping success email because airflow notifications are disabled")
return
me = import_ep_pipeline_success_email_from
you = import_ep_pipeline_success_email_dl
# Create message container - the correct MIME type is multipart/alternative.
msg = MIMEMultipart('alternative')
msg['Subject'] = "Airflow EP Data Load Complete [{}]".format(ENV)
msg['From'] = me
msg['To'] = you
if 'reload_data' in kwargs:
logger.info("Email notification being generated for reload data script")
from_dag = False
data_path = 'Reload Data Script'
url = ''
rld_any_dlq_messages = kwargs['any_dlq_messages']
rld_all_data_in_db = kwargs['all_data_in_db']
# Craft a status message for reload_data
if not rld_any_dlq_messages and rld_all_data_in_db:
rld_status_message = "Success"
else:
failure_reasons = []
if not rld_all_data_in_db:
failure_reasons.append("Some Dates Have No Data!")
if rld_any_dlq_messages:
failure_reasons.append("DLQ Messages Found")
rld_status_message = "Failure : {}".format(failure_reasons)
logger.info("Success email being generated for Reload Data Script")
else:
logger.info("Email notification being generated for airflow run")
from_dag = True
data_path = 'Airflow'
base_url = kwargs['conf'].get('webserver', 'base_url')
dag_id = kwargs['dag'].dag_id
url = "<a href='{}/admin/airflow/tree?num_runs=25&root=&dag_id={}'>Airflow</a>".format(base_url, dag_id)
logger.info("Success email being generated for Airflow run")
# Create the body of the message (a plain-text and an HTML version).
text = "Hi EP Folks!\nThe EP Data Pipeline ({}) Loaded Data in the {} environment for ".format(data_path, ENV)
if (reload_end_dt):
text += "period: {} to {} GMT/UTC. ".format(start_dt, reload_end_dt)
text += "\n The run status is {}".format(rld_status_message)
text += "\n The run included options: {}".format(kwargs['reload_opts'])
else:
text += "day: {} GMT/UTC".format(start_dt)
# Build an html table for rda and message counts for the past N days in the DB
histo_end_date = start_dt if reload_end_dt == None else reload_end_dt
histos_dict = get_record_date_histos_for_tables(start_dt, histo_end_date, reload_data=(not from_dag))
html_table = """\
<table border='1' style='width:100%'>
<tr><th>Day</th><th>RDAs</th><th>Messages</th></tr>
"""
for msg_row, rda_row in zip(histos_dict['MESSAGE_HISTO'], histos_dict['RDA_HISTO']):
html_table +="<tr><td>{}</td><td>{}</td><td>{}</td></tr>".format(rda_row[0], rda_row[1], msg_row[1])
html_table += "</p></table>"
# Build a list of DLQ Exceptions
dlq_q_size, dlq_msg_exception_s3_url_list = collects_DLQ_stats_simple()
# Report via Slack
report_issues_on_slack_for_DLQs(dlq_q_size, dlq_msg_exception_s3_url_list, **kwargs)
html_DLQ_exception_list = ''
if len(dlq_msg_exception_s3_url_list) > 0:
html_DLQ_exception_list = """
<p><strong>Some DLQ Exceptions (total DQL Messages = {})</strong><br></p>
</p>
""".format(dlq_q_size)
i = 1
for dlq_exception_url in dlq_msg_exception_s3_url_list:
link = """<p><a href="{}/">DLQ Exceptions</a></p>""".format(dlq_exception_url)
i = i+1
html_DLQ_exception_list += link
# Build an html table of message_stats_by_stage
orgs_ingest_lookup = get_org_ingest_dict()
html_table_message_stats_by_stage = ''
if from_dag:
html_table_message_stats_by_stage = 'From Airflow start_date = {}'.format(start_dt)
message_stats_by_stage = validate_stages(**kwargs)
# Insert discrepancy metrics in DB!
insert_discrepancy_metrics_in_db(start_dt, message_stats_by_stage, dlq_q_size, orgs_ingest_lookup, **kwargs)
html_table_message_stats_by_stage = """\
<p><strong>Message Counts by Org and by Pipeline Stage Output</strong><br></p>
<p>
<table border="1" style="width:100%">
<tr><th>Org_ID</th><th>Collector Msgs</th><th>Agg Msgs</th><th>DB Msgs</th><th>Discrepancy</th><th>Discr %</th><th>NRT DB Msgs</th><th>NRT Discrepancy</th><th>NRT Discr %</th><th>Skipped</th><th>Notes</th></tr>
"""
for org_id, message_counts in message_stats_by_stage.iteritems():
pipeline_stats = compute_pipeline_stats_for_org(org_id, message_counts, orgs_ingest_lookup)
html_table_message_stats_by_stage += """\
<tr>
<td>{org_id}</td>
<td>{collector_text}</td>
<td>{agg_count}</td>
<td>{db_message_count}</td>
<td>{discrepancy_count}</td>
<td>{discrepancy_pct}</td>
<td>{db_nrt_message_count}</td>
<td>{nrt_discrepancy_count}</td>
<td>{nrt_discrepancy_pct}</td>
<td>{skipped_count}</td>
<td>{notes}</td>
</tr>""".format(**pipeline_stats)
html_table_message_stats_by_stage += "</table></p>"
#Final template
html = """\
<html>
<head>
</head>
<body bgcolor="#e3ecfc">
<p>
{}
<br>
{}
</p>
{}
{}
<br>
<p>
{}
</p>
<br>
<img src="{}" width="100" height="100">
</body>
</html>
""".format(text,
url,
html_table,
html_table_message_stats_by_stage,
html_DLQ_exception_list,
random.choice(_EYE_CANDY_LIST))
# Record the MIME types of both parts - text/plain and text/html.
part1 = MIMEText(html, 'html')
part2 = MIMEText(text, 'plain')
# Attach parts into message container.
# According to RFC 2046, the last part of a multipart message, in this case
# the HTML message, is best and preferred.
msg.attach(part1)
msg.attach(part2)
# Send the message via local SMTP server.
s = smtplib.SMTP(_MANDRILL_API_ENDPOINT)
# sendmail function takes 3 arguments: sender's address, recipient's address
# and message to send - here it is sent as one string.
s.login(import_ep_pipeline_mandrill_creds_dict['username'],
import_ep_pipeline_mandrill_creds_dict['password'])
s.sendmail(me, you, msg.as_string())
s.quit()
logger.info("Success email sent")
def compute_pipeline_stats_for_org(org_id, message_counts, orgs_ingest_lookup):
if message_counts[0] == -1:
collector_text = 'Error in collector avro!'
collector_count = 0
else:
collector_count = collector_text = message_counts[0]
agg_count = message_counts[1]
db_message_count = message_counts[2]
db_nrt_message_count = message_counts[6]
skipped_count = message_counts[3]
discrepancy_count = compute_discr(collector_count - db_message_count - skipped_count, collector_count)
discrepancy_pct = perc(collector_count - db_message_count - skipped_count, collector_count)
nrt_discrepancy_count = compute_discr(collector_count - db_nrt_message_count - skipped_count, collector_count)
nrt_discrepancy_pct = perc(collector_count - db_nrt_message_count - skipped_count, collector_count)
notes = ''
if not orgs_ingest_lookup[org_id]:
notes += 'Sandboxed ({})'.format(discrepancy_count)
discrepancy_pct = 0.0
discrepancy_count = 0
return {
'org_id': org_id,
'collector_text': collector_text,
'collector_count': collector_count,
'agg_count': agg_count,
'db_message_count': db_message_count,
'db_nrt_message_count': db_nrt_message_count,
'skipped_count': skipped_count,
'discrepancy_count': discrepancy_count,
'discrepancy_pct': discrepancy_pct,
'nrt_discrepancy_count': nrt_discrepancy_count,
'nrt_discrepancy_pct': nrt_discrepancy_pct,
'notes': notes,
}
# Compute discrepancy
def compute_discr(disc, collector_count): return 'Undefined' if collector_count == 0 else (disc)
# Compute a percentage
def perc(part, whole): return 'Undefined' if whole == 0 else (100 * round(float(part)/float(whole), 3))
def send_email_notification_flow_successful(ds, **kwargs):
start_dt, end_dt = get_start_and_end_datetime(**kwargs)
generate_successful_email_util(start_dt, None, **kwargs)
return
# This function downloads target collector files, parses them using an
# Avro reader, and then counts the number of records per org
def get_collector_file_message_counts_per_org(execution_date):
start_ts = execution_date.strftime(SPARK_DATE_FORMAT)
end_ts = compute_end_dt(execution_date, None).strftime(SPARK_DATE_FORMAT)
key_name = 'msg_counts/collector_msg_count_{}_{}.json'.format(start_ts, end_ts)
org_msg_count_dict = json.load(boto.connect_s3().get_bucket(import_airflow_importer_metadata_bucket_name).get_key(key_name))
return org_msg_count_dict
def get_collector_file_message_counts_per_org_old(ingest_disabled_orgs, execution_date):
# Create a local temp dir
temp_local_dir = tempfile.mkdtemp()
print "----------- get_collector_file_message_counts_per_org: temp local dir = {} for execution date = {}, skipping {}".format(temp_local_dir,
execution_date,
ingest_disabled_orgs)
# Convert ds into the appropriate date format to be used for identifying collector files!
collector_date_string = execution_date.strftime(COLLECTOR_VALIDATE_DATE_FORMAT)
# Generate exclude conditions for each org we want to skip
ingest_disabled_exclusion_list = ''
for org_id in ingest_disabled_orgs:
ingest_disabled_exclusion_list += " --exclude '*uploads/{}/*'".format(org_id)
print "get_collector_file_message_counts_per_org: ingest_disabled_exclusion_list = {}".format(ingest_disabled_exclusion_list)
# Download the collector files
command = "aws s3 cp --recursive --exclude '*' --include '*{}*' {} --exclude '*.gz' s3://{}/ {} ".format(collector_date_string,
ingest_disabled_exclusion_list,
import_airflow_s3_collector_ingest_bucket_name,
temp_local_dir)
_call_subprocess_simple(command, tries=4, sleep_secs=5)
# Counts the message per org
org_msg_count_dict = dict()
for org_dir in os.listdir("%s/uploads" % (temp_local_dir)):
temp_file = tempfile.NamedTemporaryFile()
try:
command = "for f in `find {}/{} -type f`; do echo $f; avro cat $f >> {} ; done".format(temp_local_dir,
"uploads/%s"%(org_dir),
temp_file.name)
_call_subprocess_simple(command)
temp_file.seek(0)
msg_count = 0
for line in temp_file:
msg_count = msg_count + 1
print "msg count = {}".format(msg_count)
org_msg_count_dict[org_dir] = msg_count
finally:
# Automatically cleans up the file
temp_file.close()
# Clean up the temp local dir
shutil.rmtree(temp_local_dir)
# Print out the
pp.pprint(org_msg_count_dict)
# Returns a dict such as {'1': 52471, '11': 1, '2': 81, '5': 81, '6': 43383, '9': 81}
return org_msg_count_dict
# Helper method to handle subprocess failures
def _call_subprocess_simple(command, suppress_logging=False, allow_error=False, tries=2, sleep_secs=1):
if not suppress_logging:
logging_message = "Running : %s" % command
logger.info(logging_message)
# Attempt a "Try" number of times
attempts = 0
ret_code = -9
while attempts < tries and ret_code != 0:
logging_message = "%s Attempt at Running : %s" % (attempts, command)
ret_code = os.system(command)
attempts = attempts + 1
time.sleep(sleep_secs) # Wait sleep_secs before retrying
# If we allow errors, continue
error_message = "FAILURE on %s Attempts at Running : %s" % (attempts, command)
if allow_error == True:
return ret_code
# Otherwise, log a critical error
if ret_code != 0:
logger.critical(error_message)
# Always return the ret_code
return ret_code
# Convenience method to get start and end datetimes
def get_start_and_end_datetime(**kwargs):
end_date = kwargs['execution_date'] + timedelta(seconds=int(import_airflow_granularity_secs))
return (kwargs['execution_date'], end_date)
# Slack-ops : Report discrepancy graph on Slack
def report_issues_on_slack_for_DLQs(dlq_q_size, dlq_msg_exception_s3_url_list, **kwargs):
if dlq_q_size > 0:
print '------------> {}'.format(dlq_msg_exception_s3_url_list[0])
DLQs_found_text = "{} - {} on {} completed `{}` with {} DLQs : <{} | Sample Exception> ".format(ENV,
kwargs["dag"].dag_id,
kwargs["ti"].hostname,
kwargs["execution_date"],
dlq_q_size,
'{}'.format(dlq_msg_exception_s3_url_list[0]))
report_issues_on_slack_util(DLQs_found_text)
# Slack-ops : Report discrepancy graph on Slack
def report_issues_on_slack_for_high_discrepancies(pipeline_stats, **kwargs):
base_url = kwargs['conf'].get('webserver', 'base_url')
collector_file_count_threshold = int(import_ep_pipeline_discrepancy_alerting_config['collector_file_count_threshold'])
discrepancy_percentage_alerting_threshold = int(import_ep_pipeline_discrepancy_alerting_config['discrepancy_percentage_alerting_threshold'])
discrepancy_chart_id = int(import_ep_pipeline_discrepancy_alerting_config['discrepancy_chart_id'])
url = '{}/admin/airflow/chart?chart_id={}'.format(base_url, discrepancy_chart_id)
if pipeline_stats['collector_count'] > collector_file_count_threshold and pipeline_stats['discrepancy_pct'] > discrepancy_percentage_alerting_threshold:
high_discrepany_text = "{} - {} on {} completed `{}` with <{} | High Discrepancies>".format(ENV,
kwargs["dag"].dag_id,
kwargs["ti"].hostname,
kwargs["execution_date"],
url)
report_issues_on_slack_util(high_discrepany_text)
# Slack-ops : Slack reporting - util function
def report_issues_on_slack_util(text):
if import_ep_ops_slack_alerting_enabled:
token = slack_api_token
sc = SlackClient(token)
sc.api_call("chat.postMessage", **{"channel":ep_ops_slack_channel_name,
"username":"Airflow",
"icon_url":'https://raw.githubusercontent.com/airbnb/airflow/master/airflow/www/static/pin_100.png',
"text":text})
# Insert discrepancy metrics in the db
def insert_discrepancy_metrics_in_db(start_dt, message_stats_by_stage, dlq_q_size, orgs_ingest_lookup, **kwargs):
# Delete overlapping data
delete_db_discrepancy_data(import_ep_db_connect_string, start_dt)
# Initialize the DB Conn and cursor
db_conn = psycopg2.connect(import_ep_db_connect_string)
logger.info("----- Successfully Connected to database {}".format(import_ep_db_connect_string))
cursor = db_conn.cursor()
INSERT_SQL = '''INSERT INTO telemetry_data_discrepancy (execution_date, organization_id, granularity_secs, collector_msgs, agg_msgs, db_msgs, db_nrt_msgs, discrepancy, discrepancy_percentage, agg_filtered_gm_spam_int, dlq_q_size, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'''
# Given a dict[org_id ==> message_counts]
for org_id, message_counts in message_stats_by_stage.iteritems():
pipeline_stats = compute_pipeline_stats_for_org(org_id, message_counts, orgs_ingest_lookup)
# Initialize key columns
granularity_sec = import_airflow_granularity_secs
# Current time
now = datetime.now()
# Pass data to fill a query placeholders and let Psycopg perform
# the correct conversion (no more SQL injections!)
logger.info("----- Executing the following query against the db : {}".format(INSERT_SQL))
cursor.execute(INSERT_SQL, (start_dt,
pipeline_stats['org_id'],
granularity_sec,
pipeline_stats['collector_count'],
pipeline_stats['agg_count'],
pipeline_stats['db_message_count'],
pipeline_stats['db_nrt_message_count'],
pipeline_stats['discrepancy_count'],
pipeline_stats['discrepancy_pct'],
pipeline_stats['skipped_count'],
dlq_q_size,
now,
now))
db_conn.commit() #close the transaction
# If discrepancies rise above a certain level, report them on slack for each org!
report_issues_on_slack_for_high_discrepancies(pipeline_stats, **kwargs)
# close the cursor and connection
cursor.close()
db_conn.close()
def spark_agg_retry(context):
#purge queue
logger.info("----- Preparing for aggregation retry")
purge_sqs_queue(import_sqs_queue_name, purge_if_empty=True)
start_dt = context['execution_date']
end_dt = start_dt + timedelta(seconds=int(import_airflow_granularity_secs))
# Delete data
delete_db_data_util(import_ep_db_connect_string, start_dt, end_dt)
def purge_sqs_queue(queue_name, purge_if_empty=False):
conn = boto.sqs.connect_to_region(region)
q = conn.get_queue(queue_name)
q_size = q.count()
logger.info("----- Queue {} has {} messages".format(queue_name, q_size))
if purge_if_empty or q_size > 0:
logger.info("----- About to purge {}".format(queue_name))
was_successful = q.purge()
logger.info("----- Purged {} and received a return status of {}".format(queue_name,
was_successful))
else:
was_successful = True
return was_successful
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment