Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
from __future__ import print_function
import json
import gzip
import boto3
import time
import httplib
import urllib
from cStringIO import StringIO
def lambda_handler(event, context):
records = event["Records"]
for record in records:
if record["EventSource"] != "aws:sns":
continue
sns = record["Sns"]
message_s = sns["Message"]
message = json.loads(message_s)
print("Message: " + json.dumps(message))
s3bucket = message["s3Bucket"]
s3key = message["s3ObjectKey"][0]
print("bucket %s key %s" % (s3bucket, s3key))
objs = get_object(s3bucket, s3key)
for o in objs["Records"]:
eventJudge(o)
def eventJudge(event):
if event["eventName"] == "AttachVolume":
ui = event["userIdentity"]
name, aki, arn = ui["userName"], ui["accessKeyId"], ui["arn"]
req_param = event["requestParameters"]
volume, instance = req_param["volumeId"], req_param["instanceId"]
timestamp = event["responseElements"]["attachTime"]
t = time.gmtime(timestamp / 1000.0)
notification_message = """Volume %s attached to instance %s at %s
by user %s (%s) with AKI %s
""" % (volume, instance, time.strftime("%Y-%m-%dT%H:%M:%SZ", t), name, arn, aki)
print(notification_message)
notify_slack(notification_message)
def get_object(s3bucket, s3objectkey):
s3 = boto3.resource("s3")
bucket = s3.Bucket(s3bucket)
obj = bucket.Object(s3objectkey)
response = obj.get()
body = response["Body"]
gzdata = body.read()
jsondata = gzip.GzipFile("", "rb", 0, StringIO(gzdata)).read()
records = json.loads(jsondata)
return records
def notify_slack(msg):
slack_base = "hooks.slack.com"
slack_path = "/services/xxxxxxxx/yyyyyyyy/zzzzzzzzzzzzzzzzzzzzzz"
params = urllib.urlencode({
"payload": json.dumps(
{
"text": ":exclamation: %s" % (msg),
"username": "webhookbot",
"icon_emoji": ":ghost:",
}
)
})
print(params)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
conn = httplib.HTTPSConnection(slack_base)
conn.request("POST", slack_path, params, headers)
response = conn.getresponse()
print(response.status, response.reason)
print(response.read())
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment