Skip to content

Instantly share code, notes, and snippets.

@andrewgross
Last active November 5, 2020 15:10
Show Gist options
  • Save andrewgross/8ba32af80ecccb894b82774782e7dcd4 to your computer and use it in GitHub Desktop.
Save andrewgross/8ba32af80ecccb894b82774782e7dcd4 to your computer and use it in GitHub Desktop.
Django View to check travis CI webhook signatures. Requires Django, python-requests and pyOpenSSL packages
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import base64
import json
import logging
from urlparse import parse_qs
import requests
from OpenSSL.crypto import verify, load_publickey, FILETYPE_PEM, X509
from OpenSSL.crypto import Error as SignatureError
from django.conf import settings
from django.views.generic import View
from django.http import HttpResponseBadRequest, JsonResponse
logger = logging.getLogger(__name__)
class Travis(View):
# Make sure you use the correct config URL, the .org and .com
# have different keys!
# https://api.travis-ci.org/config
# https://api.travis-ci.com/config
TRAVIS_CONFIG_URL = settings.TRAVIS_CONFIG_URL
def post(self, request, *args, **kwargs):
signature = self._get_signature(request)
json_payload = parse_qs(request.body)['payload'][0]
try:
public_key = self._get_travis_public_key()
except requests.Timeout:
logger.error({"message": "Timed out when attempting to retrieve Travis CI public key"})
return HttpResponseBadRequest({'status': 'failed'})
except requests.RequestException as e:
logger.error({"message": "Failed to retrieve Travis CI public key", 'error': e.message})
return HttpResponseBadRequest({'status': 'failed'})
try:
self.check_authorized(signature, public_key, json_payload)
except SignatureError:
# Log the failure somewhere
return HttpResponseBadRequest({'status': 'unauthorized'})
json_data = json.loads(json_payload)
return JsonResponse({'status': 'received'})
def check_authorized(self, signature, public_key, payload):
"""
Convert the PEM encoded public key to a format palatable for pyOpenSSL,
then verify the signature
"""
pkey_public_key = load_publickey(FILETYPE_PEM, public_key)
certificate = X509()
certificate.set_pubkey(pkey_public_key)
verify(certificate, signature, payload, str('sha1'))
def _get_signature(self, request):
"""
Extract the raw bytes of the request signature provided by travis
"""
signature = request.META['HTTP_SIGNATURE']
return base64.b64decode(signature)
def _get_travis_public_key(self):
"""
Returns the PEM encoded public key from the Travis CI /config endpoint
"""
response = requests.get(self.TRAVIS_CONFIG_URL, timeout=10.0)
response.raise_for_status()
return response.json()['config']['notifications']['webhook']['public_key']
@andrewgross
Copy link
Author

andrewgross commented Sep 8, 2016

Also, if you need to sign data for testing purposes, heres how you can do that (Don't trust this for generating real RSA keys!):

import base64
from OpenSSL.crypto import (
    sign, 
    load_privatekey, 
    FILETYPE_PEM, 
    TYPE_RSA,
    PKey, 
    dump_privatekey, 
    dump_publickey,
)

def generate_keys():
    """
    Generate a new RSA key, return the PEM encoded public and private keys 
    """
    pkey = PKey()
    pkey.generate_key(TYPE_RSA, 2048)
    public_key = dump_publickey(FILETYPE_PEM, pkey)
    private_key = dump_privatekey(FILETYPE_PEM, pkey)
    return public_key, private_key

def generate_signature(pem_private_key, content):
    """
    Given a private key and some content, generate a base64 encoded signature for that content.
    Use this during testing in combination with the public key to mimic the travis API.
    """
    private_key = load_privatekey(FILETYPE_PEM, pem_private_key)
    signature = sign(private_key, content, str('sha1'))
    return base64.b64encode(signature)

@andrewgross
Copy link
Author

Added a function to extra the JSON data that is shown in the docs. It is actually passed as JSON encoded data, dumped in to a dict under a payload key, THEN urlencoded for the POST. It's a bit confusing since the docs and other examples don't highlight it.

@andrewgross
Copy link
Author

andrewgross commented Sep 14, 2016

Fixed the body loading logic again, as the Sinatra example has a bug (I think). Also added note that you MUST use the correct .com or .org config endpoint, depending where the hook comes from.

EDIT: Their example is correct, I did not realize Sinatra auto unpacks based on the Content Type header.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment