Notify CloudWatch Alarm to Slack via SNS.
import boto3
import json
import logging
import os
from base64 import b64decode
from urllib.error import URLError, HTTPError
from urllib.request import Request, urlopen
SLACK_CHANNEL = os.environ['slackChannel']
ENCRYPTED_HOOK_URL = os.environ['kmsEncryptedHookUrl']
HOOK_URL = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL))['Plaintext'].decode('utf-8')
logger = logging.getLogger()
def lambda_handler(event, context):
"""Lambda handler."""'Event: ' + str(event))
message = json.loads(event['Records'][0]['Sns']['Message'])'Message: ' + str(message))
alarm_name = message['Trigger']['MetricName']
new_state = message['NewStateValue']
reason = message['NewStateReason']
state_color = '#00FF00'
if new_state != 'OK':
state_color = '#FF0000'
slack_message = {
'channel': SLACK_CHANNEL,
# 'icon_emoji': ':cloudwatch-%s:' % (new_state.lower()),
'attachments': [
'color': state_color,
'fields': [
'value': "*%s* state is now *%s*\n```\n%s\n```" % (alarm_name, new_state, reason)
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
response = urlopen(req)'Message posted to %s', slack_message['channel'])
except HTTPError as e:
logger.error('Request failed: %d %s', e.code, e.reason)
except URLError as e:
logger.error('Server connection failed: %s', e.reason)
