Skip to content

Instantly share code, notes, and snippets.

@xfxf
Created March 30, 2017 01:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xfxf/65eaf2415f9f49d99847b07795c5a13d to your computer and use it in GitHub Desktop.
Save xfxf/65eaf2415f9f49d99847b07795c5a13d to your computer and use it in GitHub Desktop.
stripe-pinax webhook parser
#!/usr/bin/python
# Script to test stripe webhooks in development.
# Should be run from a local machine (not inside of Docker).
#
# To use, create a requestb.in, ensure this is configured in Stripe (test account),
# and pass the key (not the URL) to this as an argument. Errors (HTTP error code)
# the first run are normal.
#
# If you're getting a requests SSL error, try upgrading requests:
# $ sudo pip install --upgrade requests
from __future__ import unicode_literals
import requests
import json
import argparse
import sys
from requests.structures import CaseInsensitiveDict as cidict
LOCAL_URL = 'https://url-of-application/webhook/' # URL of pinax-stripe webhook
parser = argparse.ArgumentParser(
description='Used to bridge Stripe (test mode) and local development; parses all data from a requestb.in. '
'Note this may need to be run twice; first time might error due to Customer create events being last.'
)
parser.add_argument('key', help='key of a created requestb.in to parse')
def post_to_django(data):
print("Processing request ID: {}".format(data['id']))
r = requests.post(LOCAL_URL,
json=data,
headers={'X-Requested-With': 'XMLHttpRequest'},
verify=False)
print(r.content)
print("HTTP RESPONSE CODE: {}\n\n".format(r.status_code))
if __name__ == '__main__':
try:
args = parser.parse_args()
except:
parser.print_help()
sys.exit(0)
requests.packages.urllib3.disable_warnings()
REQUESTBIN_API = "https://requestb.in/api/v1/bins/{binkey}/requests".format(binkey=args.key)
print("Parsing {}...".format(REQUESTBIN_API))
response = requests.get(
url=REQUESTBIN_API,
headers=cidict(
{'content-type': 'application/json',
'accept': 'application/json'}
))
jsondata = response.json()
if jsondata == []:
print("No data.")
exit()
try:
for item in jsondata:
if item['method'] == 'POST':
post_to_django(json.loads(item['body']))
except TypeError:
print("Error reading data.")
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment