Skip to content

Instantly share code, notes, and snippets.

@damienmarlier51
Last active June 6, 2020 06:03
Show Gist options
  • Save damienmarlier51/93b383ce4eb81c9d6c6e7df89dfc3402 to your computer and use it in GitHub Desktop.
Save damienmarlier51/93b383ce4eb81c9d6c6e7df89dfc3402 to your computer and use it in GitHub Desktop.
Validate AWS SNS notification
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from urllib.parse import urlparse
from cryptography import x509
import requests
import base64
def is_sns_notification_valid(body):
signing_cert_url = body["SigningCertURL"]
parsed_signing_cert_url = urlparse(signing_cert_url)
if parsed_signing_cert_url.scheme != 'https' \
or not parsed_signing_cert_url.netloc.endswith('.amazonaws.com'):
return False
signing_cert = requests.get(signing_cert_url).content
cert = x509.load_pem_x509_certificate(signing_cert, default_backend())
pubkey: rsa.RSAPublicKey = cert.public_key()
signature = base64.b64decode(body["Signature"])
signature_keys = ["Message",
"MessageId",
"Subject",
"Timestamp",
"TopicArn",
"Type"]
canonical_msg = "\n".join([f"{key}\n{body[key]}"
for key in signature_keys if key in body])
try:
pubkey.verify(
signature=signature,
data=canonical_msg.encode(),
padding=padding.PKCS1v15(),
algorithm=hashes.SHA1(),
)
except Exception:
return False
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment