Skip to content

Instantly share code, notes, and snippets.

Created January 22, 2019 09:38
Show Gist options
  • Save lidaobing/b57eacf3f8bbcc4b276a65979ce44ecf to your computer and use it in GitHub Desktop.
Save lidaobing/b57eacf3f8bbcc4b276a65979ce44ecf to your computer and use it in GitHub Desktop.
import http.server
import socketserver
import datetime
import uuid
from urllib.parse import urlparse, parse_qs
__all__ = ['run']
handlers = [None]
class ApimHttpHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
return self.do_Method('GET')
def do_Method(self, method):
"""Serve a GET request."""
event = {
"version": "0",
"id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
"time": "2006-01-02T15:04:05.999999999Z",
"base64OwnerPin": "NTk0MDM1MjYzMDE5",
"resources": ["jrn:apigateway:cn-north-1::api/function"],
"region": "cn-north-1",
"detailType": "ApiGatewayReceived",
"detail": {
"path": "api request path",
"headers": {
"x-jdcloud-request-id": "headerValue"
"pathParameters": {
"queryParameters": {
"body": "string of request payload",
"requestContext": {
"stage": "test",
"apiId": "testsvc",
"identity": {
"accountId": "",
"apiKey": "",
"authType": ""
"sourceIp": ""
event["time"] = '{0:%Y-%m-%dT%H:%M:%S.%f000Z}'.format(
event["detail"]["httpMethod"] = method
for k in self.headers:
event["detail"]["headers"][k.lower()] = self.headers[k]
reqid = str(uuid.uuid4())
event["detail"]["headers"]["x-jdcloud-request-id"] = reqid
event["detail"]["headers"]["j-forwarded-for"] = ""
event["detail"]["headers"]["x-forwarded-for"] = ""
event["detail"]["headers"]["x-proto"] = "HTTP"
event["detail"]["requestContext"]["requestId"] = reqid
o = urlparse(self.path)
q = parse_qs(o.query, keep_blank_values=True)
event["detail"]["path"] = o.path
for qk in q:
v = q[qk]
for i in range(len(v)):
if v[i] == "":
v[i] = True
if len(v) == 1:
event["detail"]["queryParameters"][qk] = v[0]
event["detail"]["queryParameters"][qk] = v
res = handlers[0](event, None)
for k, v in res['headers'].items():
self.send_header(k, v)
return str(res)
class Mock(object):
def __init__(self):
def _parse_server_address(self, server_address):
if server_address is None:
return ('', 8080)
if type(server_address) is str:
words = server_address.split(':', 1)
return (words[0], int(words[1]))
if type(server_address) is tuple:
if len(server_address) == 2:
return (str(server_address[0]), int(server_address[1]))
raise TypeError
def run(self, handler, server_address=None):
handlers[0] = handler
server_address = self._parse_server_address(server_address)
with socketserver.TCPServer(server_address, ApimHttpHandler) as httpd:
print("serving at %s", server_address)
def run(handler, port):
mock = Mock(), port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment