Create a gist now

Instantly share code, notes, and snippets.

Django View to check travis CI webhook signatures. Requires Django, python-requests and pyOpenSSL packages
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import base64
import json
import logging
from urlparse import parse_qs
import requests
from OpenSSL.crypto import verify, load_publickey, FILETYPE_PEM, X509
from OpenSSL.crypto import Error as SignatureError
from django.conf import settings
from django.views.generic import View
from django.http import HttpResponseBadRequest, JsonResponse
logger = logging.getLogger(__name__)
class Travis(View):
# Make sure you use the correct config URL, the .org and .com
# have different keys!
# https://api.travis-ci.org/config
# https://api.travis-ci.com/config
TRAVIS_CONFIG_URL = settings.TRAVIS_CONFIG_URL
def post(self, request, *args, **kwargs):
signature = self._get_signature(request)
json_payload = parse_qs(request.body)['payload'][0]
try:
public_key = self._get_travis_public_key()
except requests.Timeout:
logger.error({"message": "Timed out when attempting to retrieve Travis CI public key"})
return HttpResponseBadRequest({'status': 'failed'})
except requests.RequestException as e:
logger.error({"message": "Failed to retrieve Travis CI public key", 'error': e.message})
return HttpResponseBadRequest({'status': 'failed'})
try:
self.check_authorized(signature, public_key, json_payload)
except SignatureError:
# Log the failure somewhere
return HttpResponseBadRequest({'status': 'unauthorized'})
json_data = json.loads(json_payload)
return JsonResponse({'status': 'received'})
def check_authorized(self, signature, public_key, payload):
"""
Convert the PEM encoded public key to a format palatable for pyOpenSSL,
then verify the signature
"""
pkey_public_key = load_publickey(FILETYPE_PEM, public_key)
certificate = X509()
certificate.set_pubkey(pkey_public_key)
verify(certificate, signature, payload, str('sha1'))
def _get_signature(self, request):
"""
Extract the raw bytes of the request signature provided by travis
"""
signature = request.META['HTTP_SIGNATURE']
return base64.b64decode(signature)
def _get_travis_public_key(self):
"""
Returns the PEM encoded public key from the Travis CI /config endpoint
"""
response = requests.get(self.TRAVIS_CONFIG_URL, timeout=10.0)
response.raise_for_status()
return response.json()['config']['notifications']['webhook']['public_key']
@joshk
joshk commented Sep 6, 2016

Awesome work! Feel free to send us a PR to our docs so we can link to this :)

@andrewgross
Owner
andrewgross commented Sep 8, 2016 edited

Also, if you need to sign data for testing purposes, heres how you can do that (Don't trust this for generating real RSA keys!):

import base64
from OpenSSL.crypto import (
    sign, 
    load_privatekey, 
    FILETYPE_PEM, 
    TYPE_RSA,
    PKey, 
    dump_privatekey, 
    dump_publickey,
)

def generate_keys():
    """
    Generate a new RSA key, return the PEM encoded public and private keys 
    """
    pkey = PKey()
    pkey.generate_key(TYPE_RSA, 2048)
    public_key = dump_publickey(FILETYPE_PEM, pkey)
    private_key = dump_privatekey(FILETYPE_PEM, pkey)
    return public_key, private_key

def generate_signature(pem_private_key, content):
    """
    Given a private key and some content, generate a base64 encoded signature for that content.
    Use this during testing in combination with the public key to mimic the travis API.
    """
    private_key = load_privatekey(FILETYPE_PEM, pem_private_key)
    signature = sign(private_key, content, str('sha1'))
    return base64.b64encode(signature)
@andrewgross
Owner

Added a function to extra the JSON data that is shown in the docs. It is actually passed as JSON encoded data, dumped in to a dict under a payload key, THEN urlencoded for the POST. It's a bit confusing since the docs and other examples don't highlight it.

@andrewgross
Owner
andrewgross commented Sep 14, 2016 edited

Fixed the body loading logic again, as the Sinatra example has a bug (I think). Also added note that you MUST use the correct .com or .org config endpoint, depending where the hook comes from.

EDIT: Their example is correct, I did not realize Sinatra auto unpacks based on the Content Type header.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment