|
#!/usr/bin/env python |
|
|
|
from __future__ import print_function |
|
|
|
import sys |
|
|
|
import hashlib |
|
import hmac |
|
from multiprocessing import Pool |
|
import random |
|
import requests |
|
|
|
|
|
# Helper constants |
|
class YAY_COLORS: |
|
GREEN = '\033[92m' |
|
RED = '\033[91m' |
|
RESET = '\033[0m' |
|
|
|
|
|
VALID_REQUEST = '.' |
|
MISSING_SIG = '*' |
|
MISMATCH_SIG = '#' |
|
UNSUPPORTED_DIGEST = '%' |
|
NO_GET = '@' |
|
|
|
CORRECT_RESPONSE = True |
|
INCORRECT_RESPONSE = False |
|
|
|
# Test constants |
|
TEST_URL = "http://localhost:3333/webhook" # Use this for local testing |
|
# TEST_URL = "http://your.host.tld:3333/webhook" # Use this for more realistic testing |
|
|
|
REAL_WEBHOOK_SECRET = b"asdf" # Use to make valid request |
|
FAKE_WEBHOOK_SECRET = b"asdff" # Use to make invalid request |
|
|
|
PAYLOAD = b"""{"event": "testing", "repository": {"full_name": "testing/123"}}""" |
|
"""Fake payload won't trigger any actual bot messages, but will still be received and processed as normal.""" |
|
|
|
# Get correct signature |
|
hash_ = hmac.new(REAL_WEBHOOK_SECRET, msg=PAYLOAD, digestmod=hashlib.sha256) |
|
CORRECT_SIGNATURE = 'sha256=' + hash_.hexdigest() |
|
# Use a correct signature, but a fake digest |
|
FAKE_DIGEST_SIGNATURE = 'NaN=' + hash_.hexdigest() |
|
|
|
# Make incorrect signature; yes, a random string could work, but let's be consistent, ey? |
|
hash_ = hmac.new(FAKE_WEBHOOK_SECRET, msg=PAYLOAD, digestmod=hashlib.sha256) |
|
INCORRECT_SIGNATURE = 'sha256=' + hash_.hexdigest() |
|
|
|
|
|
def do_request(method_fn, request_type, expected_status, headers): |
|
r = method_fn(TEST_URL, data=PAYLOAD if method_fn == requests.post else None, headers=headers) |
|
|
|
is_correct_response = CORRECT_RESPONSE if r.status_code == expected_status else INCORRECT_RESPONSE |
|
return ( |
|
request_type, |
|
is_correct_response, |
|
(YAY_COLORS.GREEN if is_correct_response else YAY_COLORS.RED) + request_type + YAY_COLORS.RESET, |
|
) |
|
|
|
|
|
def send_valid_request(): |
|
headers = { |
|
'X-Hub-Signature-256': CORRECT_SIGNATURE, |
|
'content-type': 'application/json', |
|
} |
|
|
|
return do_request(requests.post, VALID_REQUEST, 200, headers) |
|
|
|
|
|
def send_missing_sig(): |
|
headers = { |
|
'content-type': 'application/json', |
|
} |
|
|
|
return do_request(requests.post, MISSING_SIG, 401, headers) |
|
|
|
|
|
def send_mismatch_sig(): |
|
headers = { |
|
'X-Hub-Signature-256': INCORRECT_SIGNATURE, |
|
'content-type': 'application/json', |
|
} |
|
|
|
return do_request(requests.post, MISMATCH_SIG, 403, headers) |
|
|
|
|
|
def send_unsupported_digest(): |
|
headers = { |
|
'X-Hub-Signature-256': FAKE_DIGEST_SIGNATURE, |
|
'content-type': 'application/json', |
|
} |
|
|
|
return do_request(requests.post, UNSUPPORTED_DIGEST, 501, headers) |
|
|
|
|
|
def get_webhook(): |
|
return do_request(requests.get, NO_GET, 405, {}) |
|
|
|
|
|
def do_random_request(random_seed): |
|
random.seed(random_seed) |
|
request_ = random.choice([ |
|
send_valid_request, |
|
send_missing_sig, |
|
send_mismatch_sig, |
|
send_unsupported_digest, |
|
get_webhook, |
|
]) |
|
return request_() |
|
|
|
|
|
class Progress: |
|
def __init__(self): |
|
self.valid_requests = { |
|
CORRECT_RESPONSE: 0, |
|
INCORRECT_RESPONSE: 0, |
|
} |
|
self.missing_sig = self.valid_requests.copy() |
|
self.mismatch_sig = self.valid_requests.copy() |
|
self.unsupported_digest = self.valid_requests.copy() |
|
self.no_get = self.valid_requests.copy() |
|
|
|
def callback_(self, result): |
|
(request_type, status, msg) = result |
|
{ |
|
VALID_REQUEST: self.valid_requests, |
|
MISSING_SIG: self.missing_sig, |
|
MISMATCH_SIG: self.mismatch_sig, |
|
UNSUPPORTED_DIGEST: self.unsupported_digest, |
|
NO_GET: self.no_get |
|
}[request_type][status] += 1 |
|
|
|
print(msg, end='') |
|
sys.stdout.flush() |
|
|
|
|
|
# Main |
|
NUM_PROCS = 32 |
|
NUM_REQUESTS = 1000000 |
|
|
|
RANDOM_SEED = 12345 # For reproducibility of # vaild/invalid reqeusts |
|
random.seed(RANDOM_SEED) |
|
|
|
pool = Pool(NUM_PROCS) |
|
progress = Progress() |
|
|
|
results_ = [] |
|
for i in range(NUM_REQUESTS): |
|
proc_random_seed = random.randint(1, 1e10 - 1) |
|
results_.append( |
|
pool.apply_async( |
|
do_random_request, |
|
(proc_random_seed, ), |
|
callback=progress.callback_ |
|
) |
|
) |
|
|
|
try: |
|
[x.get() for x in results_] |
|
pool.close() |
|
except Exception: |
|
pool.terminate() |
|
finally: |
|
pool.join() |
|
|
|
summary = """ |
|
Valid requests --> {} sent |
|
{:>5d} correct response(s) |
|
{:>5d} incorrect response(s) |
|
|
|
Invalid requests: |
|
Missing Signatures --> {} sent |
|
{:>5d} correct response(s) |
|
{:>5d} incorrect response(s) |
|
Mismatched Signatures --> {} sent |
|
{:>5d} correct response(s) |
|
{:>5d} incorrect response(s) |
|
Unsupported Digest --> {} sent |
|
{:>5d} correct response(s) |
|
{:>5d} incorrect response(s) |
|
GET /webhook --> {} sent |
|
{:>5d} correct response(s) |
|
{:>5d} incorrect response(s) |
|
""".format( |
|
sum(progress.valid_requests.values()), |
|
progress.valid_requests[CORRECT_RESPONSE], |
|
progress.valid_requests[INCORRECT_RESPONSE], |
|
|
|
sum(progress.missing_sig.values()), |
|
progress.missing_sig[CORRECT_RESPONSE], |
|
progress.missing_sig[INCORRECT_RESPONSE], |
|
|
|
sum(progress.mismatch_sig.values()), |
|
progress.mismatch_sig[CORRECT_RESPONSE], |
|
progress.mismatch_sig[INCORRECT_RESPONSE], |
|
|
|
sum(progress.unsupported_digest.values()), |
|
progress.unsupported_digest[CORRECT_RESPONSE], |
|
progress.unsupported_digest[INCORRECT_RESPONSE], |
|
|
|
sum(progress.no_get.values()), |
|
progress.no_get[CORRECT_RESPONSE], |
|
progress.no_get[INCORRECT_RESPONSE], |
|
).strip() |
|
|
|
print('\n', summary, sep='') |