Skip to content

Instantly share code, notes, and snippets.

@jakebrinkmann
Created December 9, 2021 00:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jakebrinkmann/20176f0172eef09c0d2c6c1b23e75945 to your computer and use it in GitHub Desktop.
Save jakebrinkmann/20176f0172eef09c0d2c6c1b23e75945 to your computer and use it in GitHub Desktop.
AWS Cloudwatch Alarm to Lambda to Slack Webhook (python)
import datetime
import json
import os
import requests
WEBHOOK_URL: str = os.getenv("WEBHOOK_URL")
SLACK_CHANNEL: str = os.getenv("SLACK_CHANNEL", "#devops-alarms")
SLACK_USERNAME: str = os.getenv("SLACK_USERNAME", "aws-alarm-bot")
def state_color(value: str) -> str:
if value == "ALARM":
return "danger"
elif value == "OK":
return "good"
else:
return "warning"
def parse_change_time(value: str) -> int:
"""Convert change time to unix timestamp."""
return datetime.datetime.strptime(
value, "%Y-%m-%dT%H:%M:%S.%f%z"
).timestamp()
def alarm_to_slack_data(msg: dict) -> dict:
_message = {
"channel": SLACK_CHANNEL,
"attachments": [
{
"title": ":boom: *AWS CloudWatch Notification* :boom:",
"ts": parse_change_time(msg["StateChangeTime"]),
"color": state_color(msg["NewStateValue"]),
"fields": [
{
"title": "Region",
"value": msg["Region"],
"short": True,
},
{
"title": "Account ID",
"value": msg["AWSAccountId"],
"short": True,
},
{
"title": "Alarm Name",
"value": msg["AlarmName"],
"short": True,
},
{
"title": "Alarm Description",
"value": msg["AlarmDescription"],
"short": True,
},
{
"title": "Reason",
"value": msg["NewStateReason"],
"short": False,
},
{
"title": "Metric Namespace",
"value": msg["Trigger"]["Namespace"],
"short": True,
},
{
"title": "Metric Namespace",
"value": msg["Trigger"]["MetricName"],
"short": True,
},
],
},
],
}
return _message
def lambda_handler(event: dict, context: object):
alarm_message = json.loads(event["Records"][0]["Sns"]["Message"])
print(alarm_message)
slack_message = alarm_to_slack_data(alarm_message)
result = requests.post(WEBHOOK_URL, json=slack_message)
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment