-
-
Save thislight/b69161000e5ea2904057a49f16b5ac8b to your computer and use it in GitHub Desktop.
HTTP/1.1服务器实例,带POST表单支持
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import socket, AF_INET, SOCK_STREAM | |
from urllib.parse import unquote_plus | |
def read_http_request_header_string(conn: socket): | |
buffer = bytearray() | |
while True: | |
buffer.extend(conn.recv(4096)) | |
header_length = buffer.find(b"\r\n\r\n") | |
if header_length != -1: | |
return buffer[:header_length], buffer[header_length+4:] | |
def read_http_request(conn: socket): | |
s, rest = read_http_request_header_string(conn) | |
s = s.decode("ascii") | |
lines = s.split("\r\n") | |
# Parse the first line | |
fstline = lines[0] | |
method, path, protocol = fstline.split(" ") | |
if protocol != "HTTP/1.1": | |
raise RuntimeError("unknown protocol", protocol) | |
method = method.lower() | |
# Parse headers | |
header = {} | |
if len(lines) > 1: | |
for line in lines[1:]: | |
key, value = line.split(": ") | |
header[key] = value | |
return method, path, header, rest | |
def read_http_request_body(conn: socket, length: int, buffer: bytearray): | |
while len(buffer) < length: | |
buffer.extend(conn.recv(4096)) | |
return buffer[:length] | |
STATUS_MESSAGES = { | |
200: "OK", | |
303: "See Other", | |
400: "Bad Request", | |
404: "Not Found", | |
500: "Server Error", | |
} | |
def build_http_response_header(status_code, headers): | |
lines = [ | |
f"HTTP/1.1 {status_code} {STATUS_MESSAGES[status_code]}" | |
] | |
for key, value in headers: | |
lines.append(f"{key}: {value}") | |
lines.append("") | |
lines.append("") | |
return '\r\n'.join(lines).encode('ascii') | |
DEFAULT_PAGE_HTML = """<!doctype html> | |
<html> | |
<head> | |
<meta charset="utf-8" /> | |
<title>Default Page</title> | |
</head> | |
<body> | |
<h1>Hello {name}!</h1> | |
<form method="GET"> | |
<input name="name"/> | |
<button type="submit">Submit</button> | |
</form> | |
<h2>Default Name</h2> | |
<form method="POST"> | |
<input name="default_name" /> | |
<button type="submit">Submit</button> | |
</form> | |
</body> | |
</html>""" | |
def read_path(path: str): | |
parts = path.split("?", maxsplit=1) | |
if len(parts) > 1: | |
return parts[0], parts[1] | |
else: | |
return parts[0], "" | |
def parse_query(q: str): | |
pairs_str = q.split("&") | |
pairs = {} | |
for s in pairs_str: | |
if s: | |
k, v = s.split("=") | |
pairs[unquote_plus(k)] = unquote_plus(v) | |
return pairs | |
global default_name | |
default_name = "World" | |
def handle_http_request(conn: socket): | |
global default_name | |
try: | |
method, path, headers, rest = read_http_request(conn) | |
if "Content-Length" in headers: | |
body = read_http_request_body(conn, int(headers["Content-Length"]), rest) | |
else: | |
body = bytearray() | |
except Exception as e: | |
conn.send(build_http_response_header(400, [])) | |
print(f"- - 400 {STATUS_MESSAGES[400]}") | |
raise e | |
onlypath, query = read_path(path) | |
if onlypath == "/": | |
if method == "get": | |
query_dict = parse_query(query) | |
# Render page | |
content = DEFAULT_PAGE_HTML.format( | |
name=(query_dict["name"] if query_dict.get("name") else default_name) | |
).encode("utf-8") | |
headers = [ | |
("Charset", "UTF-8"), | |
("Content-Length",str(len(content))), | |
("Connection", "close"), | |
] | |
conn.send(build_http_response_header(200, headers)) | |
conn.send(content) | |
print(f"{method.upper()} {path} 200 {STATUS_MESSAGES[200]}") | |
return | |
elif method == "post": | |
form_data_s = body.decode("utf-8") | |
form_dict = parse_query(form_data_s) | |
if form_dict.get("default_name"): | |
default_name = form_dict["default_name"] | |
conn.send(build_http_response_header(303, [ | |
("Connection", "close"), | |
("Location", "."), | |
])) | |
print(f"{method.upper()} {path} 303 {STATUS_MESSAGES[303]}") | |
return | |
conn.send(build_http_response_header(404, [])) | |
print(f"{method.upper()} {path} 404 {STATUS_MESSAGES[404]}") | |
def main(): | |
with socket(AF_INET, SOCK_STREAM) as server_port: | |
server_port.bind(("127.0.0.1", 8989)) | |
server_port.listen() | |
while True: | |
conn, addr = server_port.accept() | |
with conn: | |
conn.settimeout(8) | |
handle_http_request(conn) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment