Skip to content

Instantly share code, notes, and snippets.

@nackjicholson
Last active May 13, 2019 16:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nackjicholson/d558a2cdcb644dd692a877d6683f4df9 to your computer and use it in GitHub Desktop.
Save nackjicholson/d558a2cdcb644dd692a877d6683f4df9 to your computer and use it in GitHub Desktop.
snippet of python to verify SNS signatures in flask app.
import json
import logging
from base64 import b64decode
from datetime import datetime, timezone
from urllib.request import urlopen
from M2Crypto import X509
from flask import Blueprint, request, jsonify
from werkzeug.exceptions import BadRequest, Unauthorized
logger = logging.getLogger(__name__)
bp = Blueprint("events", __name__, url_prefix="/api/events")
def get_aws_signing_cert(url):
with urlopen(url) as stream:
if stream.getcode() != 200:
raise RuntimeError("Could not load aws signing cert!")
return stream.read().decode()
def get_signing_keys(message):
message_type = message.get("Type")
if message_type == "Notification":
if "Subject" in message:
return ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
else:
return ["Message", "MessageId", "Timestamp", "TopicArn", "Type"]
elif message_type in {"SubscriptionConfirmation", "UnsubscribeConfirmation"}:
return ["Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type"]
else:
raise Exception(f"Unknown message type {message_type}")
def build_signature_string(message):
lines = []
for signing_key in get_signing_keys(message):
lines.append(signing_key)
lines.append(message[signing_key])
return "\n".join(lines) + "\n"
def verify_sns_message_signature(message):
if message["SignatureVersion"] != "1":
raise Exception(f"Wrong signature version {message['SignatureVersion']}")
pem = get_aws_signing_cert(message["SigningCertURL"])
sig_string = build_signature_string(message)
cert = X509.load_cert_string(pem)
pubkey = cert.get_pubkey()
pubkey.reset_context(md="sha1")
pubkey.verify_init()
pubkey.verify_update(sig_string.encode("utf-8"))
if pubkey.verify_final(b64decode(message["Signature"])) != 1:
raise Exception("Signature could not be verified")
@bp.route("/events", methods=["POST"])
def blast_events_webhook():
message_type_header = request.headers["X-Amz-Sns-Message-Type"]
if message_type_header not in {"SubscriptionConfirmation", "Notification"}:
raise BadRequest("Unknown message type headers.")
try:
message = json.loads(request.data.decode("utf-8"))
verify_sns_message_signature(message)
except Exception as e:
logger.warning(str(e))
raise Unauthorized("Signature could not be verified")
return jsonify(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment