Skip to content

Instantly share code, notes, and snippets.

@michimani
Created December 20, 2018 05:56
Show Gist options
  • Select an option

  • Save michimani/4ca2a6a07e9fe723e26e22fd48aedff7 to your computer and use it in GitHub Desktop.

Select an option

Save michimani/4ca2a6a07e9fe723e26e22fd48aedff7 to your computer and use it in GitHub Desktop.
Notify CloudWatch Alarm to Slack via SNS.
import boto3
import json
import logging
import os
from base64 import b64decode
from urllib.error import URLError, HTTPError
from urllib.request import Request, urlopen
SLACK_CHANNEL = os.environ['slackChannel']
ENCRYPTED_HOOK_URL = os.environ['kmsEncryptedHookUrl']
HOOK_URL = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL))['Plaintext'].decode('utf-8')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Lambda handler."""
logger.info('Event: ' + str(event))
message = json.loads(event['Records'][0]['Sns']['Message'])
logger.info('Message: ' + str(message))
alarm_name = message['Trigger']['MetricName']
new_state = message['NewStateValue']
reason = message['NewStateReason']
state_color = '#00FF00'
if new_state != 'OK':
state_color = '#FF0000'
slack_message = {
'channel': SLACK_CHANNEL,
# 'icon_emoji': ':cloudwatch-%s:' % (new_state.lower()),
'attachments': [
{
'color': state_color,
'fields': [
{
'value': "*%s* state is now *%s*\n```\n%s\n```" % (alarm_name, new_state, reason)
}
]
}
]
}
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info('Message posted to %s', slack_message['channel'])
except HTTPError as e:
logger.error('Request failed: %d %s', e.code, e.reason)
except URLError as e:
logger.error('Server connection failed: %s', e.reason)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment