Skip to content

Instantly share code, notes, and snippets.

@oremj
Last active September 22, 2016 13:59
Show Gist options
  • Save oremj/4668864 to your computer and use it in GitHub Desktop.
Save oremj/4668864 to your computer and use it in GitHub Desktop.
Set cloudwatch to publish to SNS and then subscribe an SQS queue to that SNS topic. Next, run this script with your queue and service-key arguments. This assumes that boto environment variables are set or ~/.boto exists.
import argparse
import json
import time
import requests
from boto.sqs import connect_to_region
from boto.sqs.message import RawMessage
def get_messages(queue, region='us-west-2'):
c = connect_to_region(region)
q = c.get_queue(queue)
q.set_message_class(RawMessage)
return q.get_messages()
def get_alert(msg):
data = json.loads(msg.get_body())
alert_data = json.loads(data['Message'])
return alert_data
class PagerDuty:
PAGERDUTY_API = 'https://events.pagerduty.com/generic/2010-04-15/create_event.json'
def __init__(self, service_key):
self.service_key = service_key
def pagerduty_request(self, event_type, payload):
payload['service_key'] = self.service_key
payload['event_type'] = event_type
requests.post(self.PAGERDUTY_API, data=json.dumps(payload))
def trigger_alert(self, key, description):
self.pagerduty_request('trigger', {'description': description,
'incident_key': key})
def trigger_resolve(self, key):
self.pagerduty_request('resolve', {'incident_key': key})
def process_alert(self, alert):
key = alert['AlarmName']
state = alert['NewStateValue']
reason = alert['NewStateReason']
if state == 'OK':
self.trigger_resolve(key=key)
else:
self.trigger_alert(key=key,
description='%s: %s' % (key, reason))
def loop(queue, service_key):
p = PagerDuty(service_key)
while True:
try:
msgs = get_messages(queue)
for m in msgs:
p.process_alert(get_alert(m))
m.delete()
except Exception, e:
p.trigger_alert(key='sqstopagerduty%s' % queue,
description='SQS to Pagerduty is broken: %s' % e)
time.sleep(30)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--queue', help='SQS queue name', required=True)
parser.add_argument('--service-key',
help='Pagerduy service key', required=True)
args = parser.parse_args()
loop(args.queue, args.service_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment