Skip to content

Instantly share code, notes, and snippets.

@KensoDev
Last active May 10, 2023 11:17
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save KensoDev/d9f5ea978b16bac06463c6c78191f220 to your computer and use it in GitHub Desktop.
Save KensoDev/d9f5ea978b16bac06463c6c78191f220 to your computer and use it in GitHub Desktop.
Lambda function to notify slack on ECS failed deployments
"""
Get notification from ECS when a deployment is failing. Post to slack
This lambda function receives a notification from a cloudwatch log filter when
an ECS deployment fails, it creates a hash based on the cluster arn and then
task definition and saves the file in S3.
To make sure we don't get duplicate, if we already have a file in S3, we don't
resend the notification.
"""
from json import dumps
from os import environ
from re import (
search
)
import requests
from boto3 import client
def handler(event, context):
task_definition_arn = event["detail"]["taskDefinitionArn"]
task_group = event["detail"]["group"]
cluster_arn = event["detail"]["clusterArn"]
event_json = dumps(event)
process_record(cluster_arn, task_group, task_definition_arn, event_json)
def get_message_key(cluster_arn, task_definition_arn, task_group):
m = search(r"/(.*)", cluster_arn)
cluster_name = m.group(1)
m = search(r"/(.*)", task_definition_arn)
task_definition_name = m.group(1)
return "{}/{}/{}-failure-message.json".format(cluster_name, task_group, task_definition_name)
def check_for_message(s3, bucket_name, message_key):
"""
Check if the message already exists in the bucket
:s3: S3 client
:bucket_name: Name of the bucket
:message_key: Key of the message to look for
:returns: False if the message doesn't exist and true if it does
"""
try:
s3.get_object(
Bucket=bucket_name,
Key=message_key,
)
return True
except Exception as e:
print(e) # noqa
return False
def post_to_slack(task_group, task_definition_arn):
"""
Post failure message to slack
:task_group: Task Group name from the AWS event
:task_definition_arn: Task Definition identifier from the AWS event
:returns: False if the message was not posted to slack True if it was
"""
webhook_url = environ["SLACK_WEBHOOK_URL"]
message = "The task [{}] is failing to deploy to [{}]".format(
task_group,
task_definition_arn,
)
slack_data = {"text": message}
response = requests.post(
webhook_url, data=dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
print("Error posting the message to slack, response wasn't 200") # noqa
return False
return True
def process_record(cluster_arn, task_group, task_definition_arn, event_json):
s3 = client("s3")
bucket_name = environ["BUCKET_NAME"]
message_key = get_message_key(cluster_arn, task_definition_arn, task_group)
if check_for_message(s3, bucket_name, message_key):
return True
if post_to_slack(task_group, task_definition_arn):
s3.put_object(
ACL="private",
Body=event_json,
Bucket=bucket_name,
Key=message_key,
ContentType="application/json",
)
print("Success!") # noqa
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment