Created
January 30, 2020 13:16
-
-
Save mariusmitrofan/959536736df18743479a7c03566c69d1 to your computer and use it in GitHub Desktop.
Paddle Webhook Check for Python 3.7 in AWS Lambda behind AWS API Gateway
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import base64 | |
import json | |
import urllib.parse | |
# Crypto can be found at https://pypi.org/project/pycryptodome/ | |
from Crypto.PublicKey import RSA | |
from Crypto.Hash import SHA1 | |
from Crypto.Signature import PKCS1_v1_5 | |
import phpserialize | |
def main(event, context): | |
public_key = '''-----BEGIN PUBLIC KEY----- | |
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAncWOfnvXciow60nwb7te | |
uwbluhc2WLdy8C3E4yf+gQEGjR+EXwDogWAmpJW0V3cRGhe41BBtO0vX39YeEjh3 | |
tkCIT4JTkR4yCXiXJ/tYGvsCAwEAAQ== | |
-----END PUBLIC KEY-----''' | |
# Convert key from PEM to DER - Strip the first and last lines and newlines, and decode | |
public_key_encoded = public_key[26:-25].replace('\n', '') | |
public_key_der = base64.b64decode(public_key_encoded) | |
# Parse request body and get p_signature parameter from it | |
decoded = urllib.parse.parse_qsl(event["body"]) | |
req = dict((x, y) for x, y in decoded) | |
signature = req['p_signature'] | |
# Remove the p_signature parameter | |
del req['p_signature'] | |
# Ensure all the data fields are strings | |
for field in req: | |
req[field] = str(req[field]) | |
# Check if empty fields are present in original event body | |
# And set those specific parameter names as blank strings | |
if "=&" in event["body"]: | |
for item in event["body"].split("&"): | |
if item.split("=")[1] == "": | |
req[item.split("=")[0]] = "" | |
# Sort the data | |
sorted_data = collections.OrderedDict(sorted(req.items())) | |
# and serialize the fields | |
serialized_data = phpserialize.dumps(sorted_data) | |
# verify the data | |
key = RSA.importKey(public_key_der) | |
signer = PKCS1_v1_5.new(key) | |
digest = SHA1.new() | |
digest.update(serialized_data) | |
signature = base64.b64decode(signature) | |
if signer.verify(digest, signature): | |
return { | |
"statusCode": 200, | |
"body": json.dumps({ | |
"result": "OK", | |
"request_details": req | |
}), | |
"headers": { | |
'Content-Type': 'application/json', | |
"Access-Control-Allow-Origin": "*", | |
"Access-Control-Allow-Credentials": True | |
} | |
} | |
return { | |
"statusCode": 400, | |
"body": json.dumps({ | |
"result": "Signature could not be verified", | |
"request_details": req | |
}), | |
"headers": { | |
'Content-Type': 'application/json', | |
"Access-Control-Allow-Origin": "*", | |
"Access-Control-Allow-Credentials": True | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment