Last active
November 5, 2020 15:10
-
-
Save andrewgross/8ba32af80ecccb894b82774782e7dcd4 to your computer and use it in GitHub Desktop.
Django View to check travis CI webhook signatures. Requires Django, python-requests and pyOpenSSL packages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals | |
import base64 | |
import json | |
import logging | |
from urlparse import parse_qs | |
import requests | |
from OpenSSL.crypto import verify, load_publickey, FILETYPE_PEM, X509 | |
from OpenSSL.crypto import Error as SignatureError | |
from django.conf import settings | |
from django.views.generic import View | |
from django.http import HttpResponseBadRequest, JsonResponse | |
logger = logging.getLogger(__name__) | |
class Travis(View): | |
# Make sure you use the correct config URL, the .org and .com | |
# have different keys! | |
# https://api.travis-ci.org/config | |
# https://api.travis-ci.com/config | |
TRAVIS_CONFIG_URL = settings.TRAVIS_CONFIG_URL | |
def post(self, request, *args, **kwargs): | |
signature = self._get_signature(request) | |
json_payload = parse_qs(request.body)['payload'][0] | |
try: | |
public_key = self._get_travis_public_key() | |
except requests.Timeout: | |
logger.error({"message": "Timed out when attempting to retrieve Travis CI public key"}) | |
return HttpResponseBadRequest({'status': 'failed'}) | |
except requests.RequestException as e: | |
logger.error({"message": "Failed to retrieve Travis CI public key", 'error': e.message}) | |
return HttpResponseBadRequest({'status': 'failed'}) | |
try: | |
self.check_authorized(signature, public_key, json_payload) | |
except SignatureError: | |
# Log the failure somewhere | |
return HttpResponseBadRequest({'status': 'unauthorized'}) | |
json_data = json.loads(json_payload) | |
return JsonResponse({'status': 'received'}) | |
def check_authorized(self, signature, public_key, payload): | |
""" | |
Convert the PEM encoded public key to a format palatable for pyOpenSSL, | |
then verify the signature | |
""" | |
pkey_public_key = load_publickey(FILETYPE_PEM, public_key) | |
certificate = X509() | |
certificate.set_pubkey(pkey_public_key) | |
verify(certificate, signature, payload, str('sha1')) | |
def _get_signature(self, request): | |
""" | |
Extract the raw bytes of the request signature provided by travis | |
""" | |
signature = request.META['HTTP_SIGNATURE'] | |
return base64.b64decode(signature) | |
def _get_travis_public_key(self): | |
""" | |
Returns the PEM encoded public key from the Travis CI /config endpoint | |
""" | |
response = requests.get(self.TRAVIS_CONFIG_URL, timeout=10.0) | |
response.raise_for_status() | |
return response.json()['config']['notifications']['webhook']['public_key'] |
Added a function to extra the JSON data that is shown in the docs. It is actually passed as JSON encoded data, dumped in to a dict under a payload
key, THEN urlencoded
for the POST
. It's a bit confusing since the docs and other examples don't highlight it.
Fixed the body loading logic again, as the Sinatra example has a bug (I think). Also added note that you MUST use the correct .com
or .org
config endpoint, depending where the hook comes from.
EDIT: Their example is correct, I did not realize Sinatra auto unpacks based on the Content Type header.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Also, if you need to sign data for testing purposes, heres how you can do that (Don't trust this for generating real RSA keys!):