Skip to content

Instantly share code, notes, and snippets.

@mlafeldt
Created August 12, 2022 08:41
Show Gist options
  • Save mlafeldt/df0b2a71111e09d66e078a22abcf9dd4 to your computer and use it in GitHub Desktop.
Save mlafeldt/df0b2a71111e09d66e078a22abcf9dd4 to your computer and use it in GitHub Desktop.
Extracted from https://eventbridge-inbound-webhook-templates-prod-eu-central-1/lambda-templates/github-lambdasrc.zip
"""Webhook implementation for Github"""
import os
import json
import urllib.parse
import base64
import hmac
import hashlib
from cgi import parse_header
import boto3
import botocore
import botocore.session
from aws_secretsmanager_caching import SecretCache, SecretCacheConfig
client = botocore.session.get_session().create_client('secretsmanager')
cache_config = SecretCacheConfig()
cache = SecretCache(config=cache_config, client=client)
github_webhook_secret_arn = os.environ.get('GITHUB_WEBHOOK_SECRET_ARN')
event_bus_name = os.environ.get('EVENT_BUS_NAME', 'default')
event_bridge_client = boto3.client('events')
def _add_header(request, **kwargs):
userAgentHeader = request.headers['User-Agent'] + ' fURLWebhook/1.0 (Github)'
del request.headers['User-Agent']
request.headers['User-Agent'] = userAgentHeader
event_system = event_bridge_client.meta.events
event_system.register_first('before-sign.events.PutEvents', _add_header)
def lambda_handler(event, _context):
"""Webhook function"""
headers = event.get('headers')
# Input validation
try:
json_payload = get_json_payload(event=event)
except ValueError as err:
print_error(f'400 Bad Request - {err}', headers)
return {'statusCode': 400}
except BaseException as err: # Unexpected Error
print_error('500 Internal Server Error\n' +
f'Unexpected error: {err}, {type(err)}', headers)
return {'statusCode': 500}
detail_type = headers.get('x-github-event', 'github-webhook-lambda')
try:
if not contains_valid_signature(event=event):
print_error('401 Unauthorized - Invalid Signature', headers)
return {'statusCode': 401}
response = forward_event(json_payload, detail_type)
if response['FailedEntryCount'] > 0:
print_error('500 Internal Server Error - Failed to forward message to EventBridge\n' +
str(response['Entries'][0]), headers)
return {'statusCode': 500}
return {'statusCode': 202}
except BaseException as err: # Unexpected Error
print_error('500 Internal Server Error\n' +
f'Unexpected error: {err}, {type(err)}', headers)
return {'statusCode': 500}
def normalize_payload(raw_payload, is_base64_encoded):
"""Decode payload if needed"""
if raw_payload is None:
raise ValueError('Missing event body')
if is_base64_encoded:
return base64.b64decode(raw_payload).decode('utf-8')
return raw_payload
def contains_valid_signature(event):
"""Check for the payload signature
Github documention: https://docs.github.com/en/developers/webhooks-and-events/webhooks/securing-your-webhooks#validating-payloads-from-github
"""
secret = cache.get_secret_string(github_webhook_secret_arn)
payload_bytes = get_payload_bytes(
raw_payload=event['body'], is_base64_encoded=event['isBase64Encoded'])
computed_signature = compute_signature(
payload_bytes=payload_bytes, secret=secret)
return hmac.compare_digest(event['headers']['x-hub-signature-256'], computed_signature)
def get_payload_bytes(raw_payload, is_base64_encoded):
"""Get payload bytes to feed hash function"""
if is_base64_encoded:
return base64.b64decode(raw_payload)
else:
return raw_payload.encode()
def compute_signature(payload_bytes, secret):
"""Compute HMAC-SHA256"""
m = hmac.new(key=secret.encode(), msg=payload_bytes,
digestmod=hashlib.sha256)
return 'sha256=' + m.hexdigest()
def get_json_payload(event):
"""Get JSON string from payload"""
content_type = get_content_type(event.get('headers', {}))
if not (content_type == 'application/json' or
content_type == 'application/x-www-form-urlencoded'):
raise ValueError('Unsupported content-type')
payload = normalize_payload(
raw_payload=event.get('body'),
is_base64_encoded=event['isBase64Encoded'])
if content_type == 'application/x-www-form-urlencoded':
parsed_qs = urllib.parse.parse_qs(payload)
if 'payload' not in parsed_qs or len(parsed_qs['payload']) != 1:
raise ValueError('Invalid urlencoded payload')
payload = parsed_qs['payload'][0]
try:
json.loads(payload)
except ValueError as err:
raise ValueError('Invalid JSON payload') from err
return payload
def forward_event(payload, detail_type):
"""Forward event to EventBridge"""
return event_bridge_client.put_events(
Entries=[
{
'Source': 'github.com',
'DetailType': detail_type,
'Detail': payload,
'EventBusName': event_bus_name
},
]
)
def get_content_type(headers):
"""Helper function to parse content-type from the header"""
raw_content_type = headers.get('content-type')
if raw_content_type is None:
return None
content_type, _ = parse_header(raw_content_type)
return content_type
def print_error(message, headers):
"""Helper function to print errors"""
print(f'ERROR: {message}\nHeaders: {str(headers)}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment