Created
March 30, 2017 01:31
-
-
Save xfxf/65eaf2415f9f49d99847b07795c5a13d to your computer and use it in GitHub Desktop.
stripe-pinax webhook parser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Script to test stripe webhooks in development. | |
# Should be run from a local machine (not inside of Docker). | |
# | |
# To use, create a requestb.in, ensure this is configured in Stripe (test account), | |
# and pass the key (not the URL) to this as an argument. Errors (HTTP error code) | |
# the first run are normal. | |
# | |
# If you're getting a requests SSL error, try upgrading requests: | |
# $ sudo pip install --upgrade requests | |
from __future__ import unicode_literals | |
import requests | |
import json | |
import argparse | |
import sys | |
from requests.structures import CaseInsensitiveDict as cidict | |
LOCAL_URL = 'https://url-of-application/webhook/' # URL of pinax-stripe webhook | |
parser = argparse.ArgumentParser( | |
description='Used to bridge Stripe (test mode) and local development; parses all data from a requestb.in. ' | |
'Note this may need to be run twice; first time might error due to Customer create events being last.' | |
) | |
parser.add_argument('key', help='key of a created requestb.in to parse') | |
def post_to_django(data): | |
print("Processing request ID: {}".format(data['id'])) | |
r = requests.post(LOCAL_URL, | |
json=data, | |
headers={'X-Requested-With': 'XMLHttpRequest'}, | |
verify=False) | |
print(r.content) | |
print("HTTP RESPONSE CODE: {}\n\n".format(r.status_code)) | |
if __name__ == '__main__': | |
try: | |
args = parser.parse_args() | |
except: | |
parser.print_help() | |
sys.exit(0) | |
requests.packages.urllib3.disable_warnings() | |
REQUESTBIN_API = "https://requestb.in/api/v1/bins/{binkey}/requests".format(binkey=args.key) | |
print("Parsing {}...".format(REQUESTBIN_API)) | |
response = requests.get( | |
url=REQUESTBIN_API, | |
headers=cidict( | |
{'content-type': 'application/json', | |
'accept': 'application/json'} | |
)) | |
jsondata = response.json() | |
if jsondata == []: | |
print("No data.") | |
exit() | |
try: | |
for item in jsondata: | |
if item['method'] == 'POST': | |
post_to_django(json.loads(item['body'])) | |
except TypeError: | |
print("Error reading data.") | |
exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment