Skip to content

Instantly share code, notes, and snippets.

@ramast
Forked from amertkara/aws_utils.py
Last active September 19, 2016 07:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramast/5d4bfa07f9a71a52b86526f44eadc11b to your computer and use it in GitHub Desktop.
Save ramast/5d4bfa07f9a71a52b86526f44eadc11b to your computer and use it in GitHub Desktop.
Amazon SNS Notification Verification with Python, M2Crypto. When the SNS pushes a notification, a receiver should verify the origin/integrity of the push notification (AWS) using the signature and certificate provided in the notification data. The function `verify_sns_notification` below takes the request object and verifies the origin/integrity…
# -*- coding: utf-8 -*-
import json
import urllib2
import re
from base64 import b64decode
from M2Crypto import X509
from M2Crypto.Err import M2CryptoError
SNS_MESSAGE_TYPE_SUB_NOTIFICATION = "SubscriptionConfirmation"
SNS_MESSAGE_TYPE_NOTIFICATION = "Notification"
SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION = "UnsubscribeConfirmation"
VALID_AMAZON_URL = re.compile(r"^https://sns\.[a-z\-0-9_]+\.amazonaws\.com/.*$", re.IGNORECASE)
def canonical_message_builder(content, fmt):
""" Builds the canonical message to be verified.
Sorts the fields as a requirement from AWS
Args:
content (dict): Parsed body of the response
fmt (list): List of the fields that need to go into the message
Returns (str):
canonical message
"""
m = ""
for field in sorted(fmt):
try:
m += field + "\n" + content[field] + "\n"
except KeyError:
# Build with what you have
pass
return str(m)
def get_sns_message_type(request):
return request.META["HTTP_X_AMZ_SNS_MESSAGE_TYPE"]
def verify_sns_notification(request):
""" Takes a notification request from Amazon push service SNS and verifies the origin of the notification.
Kudos to Artur Rodrigues for suggesting M2Crypto: http://goo.gl/KAgPPc
Args:
request (HTTPRequest): The request object that is passed to the view function
Returns (bool):
True if he message passes the verification, False otherwise
Raises:
ValueError: If the body of the response couldn't be parsed
M2CryptoError: If an error raises during the verification process
URLError: If the SigningCertURL couldn't be opened
"""
canonical_sub_unsub_format = ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
canonical_notification_format = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
content = json.loads(request.body)
decoded_signature = b64decode(content["Signature"])
# Depending on the message type, canonical message format varies: http://goo.gl/oSrJl8
try:
message_type = get_sns_message_type(request)
except KeyError:
return False
if message_type in (SNS_MESSAGE_TYPE_SUB_NOTIFICATION, SNS_MESSAGE_TYPE_UNSUB_NOTIFICATION):
canonical_message = canonical_message_builder(content, canonical_sub_unsub_format)
elif message_type == SNS_MESSAGE_TYPE_NOTIFICATION:
canonical_message = canonical_message_builder(content, canonical_notification_format)
else:
raise ValueError("Message Type (%s) is not recognized" % message_type)
if not VALID_AMAZON_URL.match(content["SigningCertURL"]):
raise ValueError("Unexpected amazon url %s" % content["SigningCertURL"])
# Load the certificate and extract the public key
cert = X509.load_cert_string(str(urllib2.urlopen(content["SigningCertURL"]).read()))
pubkey = cert.get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
# Feed the canonical message to sign it with the public key from the certificate
pubkey.verify_update(canonical_message)
# M2Crypto uses EVP_VerifyFinal() from openssl as the underlying verification function.
# http://goo.gl/Bk2G36: "EVP_VerifyFinal() returns 1 for a correct signature, 0 for failure and -1
# if some other error occurred."
verification_result = pubkey.verify_final(decoded_signature)
if verification_result not in (0, 1):
raise M2CryptoError("Some error occurred while verifying the signature.")
return bool(verification_result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment