Skip to content

Instantly share code, notes, and snippets.

@sebschrader
Last active October 5, 2016 14:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sebschrader/c3a95b2bd19af525cdedd0387657e694 to your computer and use it in GitHub Desktop.
Save sebschrader/c3a95b2bd19af525cdedd0387657e694 to your computer and use it in GitHub Desktop.
GitHub Release Hook to run Jekyll
# SECRET must be a bytes object
SECRET = b"correcthorsebatterystaple"
TARGET_DIR = "/var/www/htdocs/my-site"
REPO_DIR = "/var/lib/my-repo.git"
#!/usr/bin/python3 -u
import argparse
import collections
import functools
import hashlib
import hmac
import io
import json
import os
import subprocess
import sys
import time
import urllib.parse
import wsgiref.simple_server
class HTTPError(Exception):
def __init__(self, code, message=None):
super().__init__(code, message)
self.code = code
self.message = message
class HTTPBadRequestError(HTTPError):
def __init__(self, message=None):
super().__init__(400, message)
class HTTPUnauthorizedError(HTTPError):
def __init__(self, message=None):
super().__init__(401, message)
class HTTPForbiddenError(HTTPError):
def __init__(self, message=None):
super().__init__(403, message)
class HTTPNotFoundError(HTTPError):
def __init__(self, message=None):
super().__init__(404, message)
class HTTPMethodNotAllowedError(HTTPError):
def __init__(self, message=None):
super().__init__(405, message)
class HTTPInternalServerError(HTTPError):
def __init__(self, message=None):
super().__init__(500, message)
class HTTPNotImplementedError(HTTPError):
def __init__(self, message=None):
super().__init__(501, message)
class Headers(collections.Mapping):
def __init__(self, environ):
super().__init__()
self.environ = environ
@staticmethod
def _cgi_name(key):
if not isinstance(key, str):
return key
cgi_name = key.upper().replace('-', '_')
if cgi_name not in {'CONTENT_LENGTH', 'CONTENT_TYPE'}:
return 'HTTP_' + cgi_name
return cgi_name
def __getitem__(self, key):
cgi_name = self._cgi_name(key)
try:
return self.environ[cgi_name]
except KeyError:
raise HTTPBadRequestError("Missing header {}".format(key))
def __contains__(self, key):
cgi_name = self._cgi_name(key)
return cgi_name in self.environ
def __iter__(self):
for key, value in self.environ.items():
if key.startswith('HTTP_') and key not in {'HTTP_CONTENT_LENGTH',
'HTTP_CONTENT_TYPE'}:
yield key[5:].title().replace('_', '-')
elif key in {'CONTENT_LENGTH', 'CONTENT_TYPE'}:
yield key.title().replace('_', '-')
def __len__(self):
return len(iter(self))
class BaseRequestHandler:
"""
Webhook that receives and verifies any GitHub event.
To handle an event, subclass this event and implement
handle_{event}_event methods for all events you want to handle.
The handle methods should either return a (code, dict) tuple that will be
used to return a JSON response or raise an AbortError.
"""
default_encoder = functools.partial(json.dumps, indent=2)
reason_phrases = {
100: "Continue",
200: "OK",
400: "Bad Request",
401: "",
402: "",
403: "Forbidden",
404: "Not Found",
500: "Internal Error",
}
def __init__(self, environ, start_response, *, secret):
self.headers = Headers(environ)
self.environ = environ
self.start_response = start_response
self.input = environ['wsgi.input']
self.errors = environ['wsgi.errors']
self.request_method = environ['REQUEST_METHOD']
self.remote_addr = environ['REMOTE_ADDR']
self.remote_port = environ['REMOTE_PORT']
self.server_name = environ['SERVER_NAME']
self.server_port = environ['SERVER_PORT']
self.server_protocol = environ['SERVER_PROTOCOL']
self.script_name = environ['SCRIPT_NAME']
self.path_info = environ['PATH_INFO']
self.query_string = environ['QUERY_STRING']
self.secret = secret
def send_error(self, code, message=None, exc_info=None):
"""
Send an error response.
:param code: HTTP status code
:param message: An error message string
:param exc_info: Exception info
"""
response = {'message': message}
yield from self.send_json(code, response, exc_info)
def send_json(self, code, obj, exc_info=None, encoder=default_encoder):
"""
Send a JSON response
:param code: HTTP status code
:param obj: Python object to serialize
:param encoder: Callable that takes a Python object and returns the
encoded JSON as str
"""
json = encoder(obj)
content = json.encode('utf-8', 'replace')
try:
reason_phrase = self.reason_phrases[code]
except KeyError:
status = str(code)
else:
status = "{:d} {}".format(code, reason_phrase)
self.start_response(status, [
('Content-Type', 'application/json'),
('Content-Length', str(len(content))),
], exc_info)
if self.request_method != 'HEAD' and code >= 200 and code not in (204, 304):
yield content
else:
yield b''
KNOWN_METHODS = {'OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT'}
def handle_request(self):
try:
try:
method_handler = getattr(self, 'do_{}'.format(self.request_method))
except AttributeError as e:
if self.request_method not in self.KNOWN_METHODS:
raise HTTPNotImplementedError("Unknown HTTP method {}"
.format(self.method))
else:
raise HTTPMethodNotAllowedError("HTTP method {} is not "
"supported"
.format(self.method))
code, response = method_handler()
except HTTPError as e:
return self.send_error(e.code, e.message)
except Exception as e:
import traceback
print('-' * 78, file=self.errors)
print('Exception happened during processing of request from '
'{}:{}:'
.format(self.remote_addr, self.remote_port),
file=self.errors)
traceback.print_exc(file=self.errors)
print('-' * 78, file=self.errors)
self.errors.flush()
return self.send_error(500, "An unexpected error occurred.", sys.exc_info())
else:
return self.send_json(code, response)
def do_GET(self):
return 200, {'supported_events': [
attr[7:-6]
for attr in dir(self)
if attr.startswith('handle_') and attr.endswith('_event')
]}
do_HEAD = do_GET
@staticmethod
def verify_signature(secret, signature, content):
algorithm, sep, digest = signature.partition('=')
if sep == '':
raise HTTPBadRequestError("Invalid X-Hub-Signature format")
if algorithm not in hashlib.algorithms_available:
raise HTTPNotImplementedError("Unsupported digest {}".format(algorithm))
digestmod = getattr(hashlib, algorithm)
mac = hmac.new(secret, content, digestmod)
if not hmac.compare_digest(mac.hexdigest(), digest):
raise HTTPUnauthorizedError("Signature verification failed")
@staticmethod
def decode_urlencoded_content(content):
try:
decoded_content = content.decode('ascii')
except UnicodeDecodeError as e:
raise HTTPBadRequestError("Could not decode content as ASCII") from e
try:
form = dict(urllib.parse.parse_qsl(decoded_content,
strict_parsing=True,
errors='strict'))
except ValueError as e:
raise HTTPBadRequestError("Content is not valid "
"application/x-www-form-urlencoded form "
"data") from e
try:
return form['payload']
except KeyError as e:
raise HTTPBadRequestError("Missing form element 'payload'") from e
@staticmethod
def decode_json_content(content):
try:
return content.decode('utf-8')
except UnicodeDecodeError as e:
raise HTTPBadRequestError("Could not decode content as UTF-8") from e
def do_POST(self):
if 'Content-Length' not in self.headers:
content_length = -1
else:
try:
content_length = int(self.headers['Content-Length'])
except ValueError as e:
raise HTTPBadRequestError("Invalid Content-Length") from e
if content_length < 0:
raise HTTPBadRequestError("Invalid Content-Length")
content = self.input.read(content_length)
if len(content) < content_length:
raise HTTPBadRequestError("Content was shorter than Content-Length {} "
"< {}".format(len(content), content_length))
self.verify_signature(self.secret, self.headers['X-Hub-Signature'],
content)
content_type = self.headers['Content-Type']
if content_type == 'application/json':
payload = self.decode_json_content(content)
elif content_type == 'application/x-www-form-urlencoded':
payload = self.decode_urlencoded_content(content)
else:
raise HTTPBadRequestError("Content-Type must be either application/"
"json or application/x-www-form-urlencoded")
try:
message = json.loads(payload)
except ValueError as e:
raise HTTPBadRequestError("Payload is not a valid JSON document") from e
event = self.headers['X-GitHub-Event']
delivery = self.headers['X-GitHub-Delivery']
try:
event_handler = getattr(self, "handle_{}_event".format(event))
except AttributeError as e:
raise HTTPBadRequestError("Unsupported event {}".format(event)) from e
return event_handler(delivery, message)
class PingHandler(BaseRequestHandler):
def handle_ping_event(self, delivery, message):
return 200, {'message': 'Ping OK'}
class WSGIApp:
def __init__(self, handler_class=PingHandler, handler_kwargs=None):
self.handler_class = handler_class
if handler_kwargs is None:
handler_kwargs = {}
self.handler_kwargs = handler_kwargs
def __call__(self, environ, start_response):
request_handler = self.handler_class(environ, start_response,
**self.handler_kwargs)
return request_handler.handle_request()
class JekyllBuildHandler(PingHandler):
def __init__(self, environ, start_response, *, secret, repo_dir, target_dir):
super().__init__(environ, start_response, secret=secret)
self.repo_dir = repo_dir
self.target_dir = target_dir
def handle_release_event(self, delivery, message):
try:
tag_name = message['release']['tag_name']
except KeyError as e:
raise HTTPNotFoundError("Missing key 'release'->'tag_name'") from e
commands = [
self.run(["git", "fetch", "--all"]),
self.run(["git", "submodule", "update", "--init", "--checkout",
"--force", "--recursive"]),
self.run(["git", "checkout", "tags/{}".format(tag_name)]),
self.run(["bundle", "exec", "jekyll", "build", "-d", self.target_dir]),
]
return 200, {'message': 'Build successful', 'commands': commands}
def run(self, args):
try:
with subprocess.Popen(args, stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True, shell=False, cwd=self.repo_dir,
universal_newlines=True,
restore_signals=True) as process:
try:
stdout, stderr = process.communicate(timeout=60)
except subprocess.TimeoutExpired as e:
process.kill()
stdout, stderr = process.communicate()
raise HTTPInternalServerError(
"Command '{}' took more than {}s to complete:\n{}"
.format(process.args, e.timeout, stdout))
if process.returncode != 0:
raise HTTPInternalServerError(
"Command '{}' returned non-zero exit status {}:\n{}"
.format(process.args, process.returncode, stdout))
return {'cmd': process.args, 'output': stdout}
except (OSError, subprocess.SubprocessError) as e:
raise HTTPInternalServerError(500, str(e)) from e
def main():
argparser = argparse.ArgumentParser(description='Github hook handler')
argparser.add_argument('-b', '--bind', help='Address to bind to', default="")
argparser.add_argument('-p', '--port', type=int, help='Port to listen on', default=8080)
args = argparser.parse_args()
import config
app = WSGIApp(JekyllBuildHandler, handler_kwargs={
'secret': config.SECRET,
'repo_dir': config.REPO_DIR,
'target_dir': config.TARGET_DIR})
server = wsgiref.simple_server.make_server(args.bind, args.port, app)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.server_close()
return os.EX_OK
if __name__ == '__main__':
sys.exit(main())
import hook
import config
application = hook.WSGIApp(hook.JekyllBuildHandler, handler_kwargs={
'secret': config.SECRET,
'repo_dir': config.REPO_DIR,
'target_dir': config.TARGET_DIR})
import concurrent.futures
import hashlib
import hmac
import io
import json
import os
import random
import socket
import string
import subprocess
import sys
import tempfile
import time
import unittest.mock
import urllib.parse
import uuid
import wsgiref.util
import wsgiref.validate
import pytest
import hook
@pytest.yield_fixture
def server():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
env = {
'SECRET': SECRET,
'REPO_DIR': '/tmp',
'TARGET_DIR': '/tmp',
}
argv = ["./hook.py", "--bind", HOST, "--port", str(PORT)]
patch = unittest.mock.patch
class TestingHandler(hook.JekyllBuildHandler):
secret = SECRET
repo_dir = '/tmp'
target_dir = '/tmp'
server = http.server.HTTPServer((HOST, PORT), TestingHandler)
with executor:
future = executor.submit(server.serve_forever)
yield future, TestingHandler
if not future.done():
server.shutdown()
future.result()
@pytest.fixture
def repo_dir():
return '/tmp'
@pytest.fixture
def target_dir():
return '/tmp'
@pytest.fixture
def secret():
gen = random.SystemRandom()
return ''.join(gen.choice(string.ascii_letters)
for i in range(20)).encode('ascii')
@pytest.fixture
def app(secret, repo_dir, target_dir):
app = hook.WSGIApp(hook.JekyllBuildHandler, handler_kwargs={
'secret': secret,
'repo_dir': repo_dir,
'target_dir': target_dir,
})
return wsgiref.validate.validator(app)
def assert_response_headers(headers):
headers = dict(headers)
assert 'Content-Length' in headers
assert 'Content-Type' in headers
assert headers['Content-Type'] == 'application/json'
def assert_json_response(expected_code, response):
code, reason, headers, content = response
assert_response_headers(headers)
assert isinstance(json.loads(content.decode('utf-8')), dict)
assert expected_code == code
def create_message(secret, event, message, form_encoded=False):
payload = json.dumps(message)
if form_encoded:
content_type = 'application/x-www-form-urlencoded'
content = urllib.parse.urlencode({'payload': payload},
errors='strict').encode('utf-8')
else:
content_type = 'application/json'
content = payload.encode('utf-8')
mac = hmac.new(secret, content, digestmod=hashlib.sha1)
headers = {
'X-GitHub-Delivery': str(uuid.uuid4()),
'X-GitHub-Event': event,
'X-Hub-Signature': "sha1=" + mac.hexdigest(),
'Content-Type': content_type,
}
return headers, content
def make_request(app, method, headers, content):
environ = {
('' if name in {'Content-Length', 'Content-Type'} else 'HTTP_') +
name.upper().replace('-', '_'): value
for name, value in headers.items()
}
errors = io.StringIO()
output = io.BytesIO()
input = io.BytesIO(content)
environ.update({
'REMOTE_ADDR': '127.0.0.1',
'REMOTE_PORT': '65534',
'SERVER_ADDR': '127.0.0.1',
'SERVER_NAME': 'localhost',
'SERVER_PORT': '8080',
'SERVER_SOFTWARE': 'test_hook.py',
'REQUEST_METHOD': method,
'QUERY_STRING': '',
'wsgi.input': input,
'wsgi.errors': errors,
})
wsgiref.util.setup_testing_defaults(environ)
saved_status_code = None
saved_reason_phrase = None
saved_response_headers = None
saved_exc_info = None
def start_response(status, response_headers, exc_info=None):
nonlocal saved_status_code, saved_reason_phrase, \
saved_response_headers, saved_exc_info
code, sep, saved_reason_phrase = status.partition(' ')
saved_status_code = int(code)
saved_response_headers = response_headers
m = output.getbuffer()
size = m.nbytes
m.release()
if size > 0:
raise exc_info[1].with_traceback(exc_info[2])
saved_exc_info = exc_info
return output.write
try:
chunks = app(environ, start_response)
for chunk in chunks:
output.write(chunk)
chunks.close()
except:
raise
finally:
sys.stderr.write(errors.getvalue())
sys.stderr.flush()
response_content = output.getvalue()
if saved_exc_info:
raise saved_exc_info[1].with_traceback(saved_exc_info[2])
return saved_status_code, saved_reason_phrase, saved_response_headers, \
response_content
def test_get(app):
assert_json_response(200, make_request(app, "GET", {}, b''))
def test_head(app):
code, reason, headers, content = make_request(app, "HEAD", {}, b'')
assert_response_headers(headers)
assert code == 200
assert content == b''
@pytest.mark.parametrize('form_encoded', [
True,
False,
])
def test_valid_ping(app, secret, form_encoded):
headers, content = create_message(secret, 'ping', {}, form_encoded)
assert_json_response(200, make_request(app, "POST", headers, content))
def test_invalid_event(app, secret):
headers, content = create_message(secret, 'invalid', {})
assert_json_response(400, make_request(app, "POST", headers, content))
@pytest.mark.parametrize('header', [
'X-GitHub-Delivery',
'X-GitHub-Event',
'X-Hub-Signature',
'Content-Type',
])
def test_missing_header(app, secret, header):
headers, content = create_message(secret, 'ping', {})
headers.pop(header)
assert_json_response(400, make_request(app, "POST", headers, content))
def test_invalid_signature(app, secret):
headers, content = create_message(secret, 'ping', {})
content = content + b'123'
assert_json_response(401, make_request(app, "POST", headers, content))
def test_valid_release(app, secret):
tag_name = 'foobar'
headers, content = create_message(secret, 'release', {'release': {'tag_name': tag_name}})
def Popen(args, **kwargs):
mock = unittest.mock.MagicMock(spec=subprocess.Popen, autospec=True)
attrs = {
'args': args,
'__enter__.return_value': mock,
'communicate.return_value': ('Mocked stdout output\n', None),
'returncode': 0,
}
mock.configure_mock(**attrs)
return mock
patched_Popen = unittest.mock.patch.object(
subprocess, 'Popen', autospec=True,
side_effect=Popen)
with patched_Popen:
response = make_request(app, "POST", headers, content)
assert_json_response(200, response)
kwargs = {
'cwd':'/tmp',
'stdin': subprocess.DEVNULL,
'stdout': subprocess.PIPE,
'stderr': subprocess.STDOUT,
'close_fds': True,
'restore_signals': True,
'shell': False,
'universal_newlines': True,
}
subprocess.Popen.assert_has_calls([
unittest.mock.call(
["git", "fetch", "--all"],
**kwargs),
unittest.mock.call(
["git", "submodule", "update", "--init", "--checkout",
"--force", "--recursive"],
**kwargs),
unittest.mock.call(
["git", "checkout", "tags/{}".format(tag_name)],
**kwargs),
unittest.mock.call(
["bundle", "exec", "jekyll", "build", "-d", "/tmp"],
**kwargs),
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment