Skip to content

Instantly share code, notes, and snippets.

@thislight
Created March 20, 2023 10:29
Show Gist options
  • Select an option

  • Save thislight/d32a565609ee1f2379fc9b3af1d87012 to your computer and use it in GitHub Desktop.

Select an option

Save thislight/d32a565609ee1f2379fc9b3af1d87012 to your computer and use it in GitHub Desktop.
HTTP/1.1服务器实例,带简单查询字符串支持
from socket import socket, AF_INET, SOCK_STREAM
from urllib.parse import unquote_plus
def read_http_request_header_string(conn: socket):
buffer = bytearray()
while True:
buffer.extend(conn.recv(4096))
header_length = buffer.find(b"\r\n\r\n")
if header_length != -1:
return buffer[:header_length]
def read_http_request(conn: socket):
s = read_http_request_header_string(conn).decode("ascii")
lines = s.split("\r\n")
# Parse the first line
fstline = lines[0]
method, path, protocol = fstline.split(" ")
if protocol != "HTTP/1.1":
raise RuntimeError("unknown protocol", protocol)
method = method.lower()
# Parse headers
header = {}
if len(lines) > 1:
for line in lines[1:]:
key, value = line.split(": ")
header[key] = value
return method, path, header
STATUS_MESSAGES = {
200: "OK",
400: "Bad Request",
404: "Not Found",
500: "Server Error",
}
def build_http_response_header(status_code, headers):
lines = [
f"HTTP/1.1 {status_code} {STATUS_MESSAGES[status_code]}"
]
for key, value in headers:
lines.append(f"{key}: {value}")
lines.append("")
lines.append("")
return '\r\n'.join(lines).encode('ascii')
DEFAULT_PAGE_HTML = """<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>Default Page</title>
</head>
<body>
<h1>Hello {name}!</h1>
<form method="GET">
<input name="name" label="Hello to..." />
<button type="submit">Submit</button>
</form>
</body>
</html>"""
def read_path(path: str):
parts = path.split("?", maxsplit=1)
if len(parts) > 1:
return parts[0], parts[1]
else:
return parts[0], ""
def parse_query(q: str):
pairs_str = q.split("&")
pairs = {}
for s in pairs_str:
if s:
k, v = s.split("=")
pairs[unquote_plus(k)] = unquote_plus(v)
return pairs
def handle_http_request(conn: socket):
try:
method, path, headers = read_http_request(conn)
except Exception as e:
conn.send(build_http_response_header(400, []))
print(f"- - 400 {STATUS_MESSAGES[400]}")
raise e
onlypath, query = read_path(path)
if onlypath == "/" and method == "get":
query_dict = parse_query(query)
# Render page
content = DEFAULT_PAGE_HTML.format(
name=(query_dict["name"] if query_dict.get("name") else "World")
).encode("utf-8")
headers = [
("Charset", "UTF-8"),
("Content-Length",str(len(content))),
("Connection", "close"),
]
conn.send(build_http_response_header(200, headers))
conn.send(content)
print(f"{method.upper()} {path} 200 {STATUS_MESSAGES[200]}")
else:
conn.send(build_http_response_header(404, []))
print(f"{method.upper()} {path} 404 {STATUS_MESSAGES[404]}")
def main():
with socket(AF_INET, SOCK_STREAM) as server_port:
server_port.bind(("127.0.0.1", 8989))
server_port.listen()
while True:
conn, addr = server_port.accept()
with conn:
conn.settimeout(8)
handle_http_request(conn)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment