Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Django View to check travis CI webhook signatures. Requires Django, python-requests and pyOpenSSL packages
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import base64
import json
import logging
from urlparse import parse_qs
import requests
from OpenSSL.crypto import verify, load_publickey, FILETYPE_PEM, X509
from OpenSSL.crypto import Error as SignatureError
from django.conf import settings
from django.views.generic import View
from django.http import HttpResponseBadRequest, JsonResponse
logger = logging.getLogger(__name__)
class Travis(View):
# Make sure you use the correct config URL, the .org and .com
# have different keys!
# https://api.travis-ci.org/config
# https://api.travis-ci.com/config
TRAVIS_CONFIG_URL = settings.TRAVIS_CONFIG_URL
def post(self, request, *args, **kwargs):
signature = self._get_signature(request)
json_payload = parse_qs(request.body)['payload'][0]
try:
public_key = self._get_travis_public_key()
except requests.Timeout:
logger.error({"message": "Timed out when attempting to retrieve Travis CI public key"})
return HttpResponseBadRequest({'status': 'failed'})
except requests.RequestException as e:
logger.error({"message": "Failed to retrieve Travis CI public key", 'error': e.message})
return HttpResponseBadRequest({'status': 'failed'})
try:
self.check_authorized(signature, public_key, json_payload)
except SignatureError:
# Log the failure somewhere
return HttpResponseBadRequest({'status': 'unauthorized'})
json_data = json.loads(json_payload)
return JsonResponse({'status': 'received'})
def check_authorized(self, signature, public_key, payload):
"""
Convert the PEM encoded public key to a format palatable for pyOpenSSL,
then verify the signature
"""
pkey_public_key = load_publickey(FILETYPE_PEM, public_key)
certificate = X509()
certificate.set_pubkey(pkey_public_key)
verify(certificate, signature, payload, str('sha1'))
def _get_signature(self, request):
"""
Extract the raw bytes of the request signature provided by travis
"""
signature = request.META['HTTP_SIGNATURE']
return base64.b64decode(signature)
def _get_travis_public_key(self):
"""
Returns the PEM encoded public key from the Travis CI /config endpoint
"""
response = requests.get(self.TRAVIS_CONFIG_URL, timeout=10.0)
response.raise_for_status()
return response.json()['config']['notifications']['webhook']['public_key']

joshk commented Sep 6, 2016

Awesome work! Feel free to send us a PR to our docs so we can link to this :)

Owner

andrewgross commented Sep 8, 2016

Also, if you need to sign data for testing purposes, heres how you can do that (Don't trust this for generating real RSA keys!):

import base64
from OpenSSL.crypto import (
    sign, 
    load_privatekey, 
    FILETYPE_PEM, 
    TYPE_RSA,
    PKey, 
    dump_privatekey, 
    dump_publickey,
)

def generate_keys():
    """
    Generate a new RSA key, return the PEM encoded public and private keys 
    """
    pkey = PKey()
    pkey.generate_key(TYPE_RSA, 2048)
    public_key = dump_publickey(FILETYPE_PEM, pkey)
    private_key = dump_privatekey(FILETYPE_PEM, pkey)
    return public_key, private_key

def generate_signature(pem_private_key, content):
    """
    Given a private key and some content, generate a base64 encoded signature for that content.
    Use this during testing in combination with the public key to mimic the travis API.
    """
    private_key = load_privatekey(FILETYPE_PEM, pem_private_key)
    signature = sign(private_key, content, str('sha1'))
    return base64.b64encode(signature)
Owner

andrewgross commented Sep 14, 2016

Added a function to extra the JSON data that is shown in the docs. It is actually passed as JSON encoded data, dumped in to a dict under a payload key, THEN urlencoded for the POST. It's a bit confusing since the docs and other examples don't highlight it.

Owner

andrewgross commented Sep 14, 2016

Fixed the body loading logic again, as the Sinatra example has a bug (I think). Also added note that you MUST use the correct .com or .org config endpoint, depending where the hook comes from.

EDIT: Their example is correct, I did not realize Sinatra auto unpacks based on the Content Type header.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment