Skip to content

Instantly share code, notes, and snippets.

@lukehutton
Last active August 1, 2018 05:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save lukehutton/74c47f7fd0dfdf08cc26d6e3403fd61a to your computer and use it in GitHub Desktop.
Save lukehutton/74c47f7fd0dfdf08cc26d6e3403fd61a to your computer and use it in GitHub Desktop.
from __future__ import print_function
import json, httplib, boto3
def get_slack_msg(msg):
return get_deploy_msg(msg) if 'status' in msg else get_deploy_instance_msg(msg)
def get_deploy_msg(msg):
success = msg['status'] == 'SUCCEEDED'
deployment_group = msg['deploymentGroupName']
deployment_id = msg['deploymentId']
codedeploy_client = boto3.client('codedeploy')
deployment = codedeploy_client.get_deployment(deploymentId=deployment_id)
package = deployment['deploymentInfo']['revision']['s3Location']['key']
status_text = 'SUCCEEDED' if success else 'FAILED'
text = '[%s] %s Deploy %s' % (deployment_group, package, status_text)
if not success:
text = text + '\n' + msg['errorInformation']
return text
def get_deploy_instance_msg(msg):
success = msg['instanceStatus'] == 'Succeeded'
deployment_id = msg['deploymentId']
instance_id = msg['instanceId']
codedeploy_client = boto3.client('codedeploy')
deployment = codedeploy_client.get_deployment(deploymentId=deployment_id)
package = deployment['deploymentInfo']['revision']['s3Location']['key']
if ('i-' in instance_id):
ec2 = boto3.resource('ec2')
instances = ec2.instances.filter(InstanceIds=[instance_id])
instance = next(iter(instances))
instance_name = [tag['Value'] for tag in instance.tags if tag['Key'] == 'Name'][0]
environment = [tag['Value'] for tag in instance.tags if tag['Key'] == 'environment'][0]
else:
on_prem_instance = codedeploy_client.get_on_premises_instance(instanceName=instance_id)
instance_name = [tag['Value'] for tag in on_prem_instance['instanceInfo']['tags'] if tag['Key'] == 'Name'][0]
environment = [tag['Value'] for tag in on_prem_instance['instanceInfo']['tags'] if tag['Key'] == 'environment'][0]
status_text = 'SUCCEEDED' if success else 'FAILED'
return '[%s] %s Deploy to instance %s %s' % (environment, package, instance_name, status_text)
def send_slack(msg):
slack_channel = 'xxx'
slack_user = 'AWS CodeDeploy'
slack_url = '/services/xxx/yyy/zzz'
icon_url = "xxx"
color = 'good' if 'SUCCEEDED' in msg else 'danger'
payload = {
"channel": slack_channel,
"username": slack_user,
"fallback": "",
"pretext": "",
"color": color,
"fields": [{"value":msg}],
"icon_url": icon_url
}
#print("Payload: " + json.dumps(payload, indent=2))
headers = {'Content-Type': 'application/json'}
connection = httplib.HTTPSConnection('hooks.slack.com')
connection.request('POST', slack_url, json.dumps(payload), headers)
response = connection.getresponse()
#print("response: " + response.read().decode())
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
msg = json.loads(event['Records'][0]['Sns']['Message'])
msg = get_slack_msg(msg)
send_slack(msg)
return msg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment