Notify CloudWatch Alarm to Slack via SNS.
import boto3 | |
import json | |
import logging | |
import os | |
from base64 import b64decode | |
from urllib.error import URLError, HTTPError | |
from urllib.request import Request, urlopen | |
SLACK_CHANNEL = os.environ['slackChannel'] | |
ENCRYPTED_HOOK_URL = os.environ['kmsEncryptedHookUrl'] | |
HOOK_URL = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL))['Plaintext'].decode('utf-8') | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
def lambda_handler(event, context): | |
"""Lambda handler.""" | |
logger.info('Event: ' + str(event)) | |
message = json.loads(event['Records'][0]['Sns']['Message']) | |
logger.info('Message: ' + str(message)) | |
alarm_name = message['Trigger']['MetricName'] | |
new_state = message['NewStateValue'] | |
reason = message['NewStateReason'] | |
state_color = '#00FF00' | |
if new_state != 'OK': | |
state_color = '#FF0000' | |
slack_message = { | |
'channel': SLACK_CHANNEL, | |
# 'icon_emoji': ':cloudwatch-%s:' % (new_state.lower()), | |
'attachments': [ | |
{ | |
'color': state_color, | |
'fields': [ | |
{ | |
'value': "*%s* state is now *%s*\n```\n%s\n```" % (alarm_name, new_state, reason) | |
} | |
] | |
} | |
] | |
} | |
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8')) | |
try: | |
response = urlopen(req) | |
response.read() | |
logger.info('Message posted to %s', slack_message['channel']) | |
except HTTPError as e: | |
logger.error('Request failed: %d %s', e.code, e.reason) | |
except URLError as e: | |
logger.error('Server connection failed: %s', e.reason) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment