Skip to content

Instantly share code, notes, and snippets.

@thislight
Created March 20, 2023 12:13
Show Gist options
  • Save thislight/b69161000e5ea2904057a49f16b5ac8b to your computer and use it in GitHub Desktop.
Save thislight/b69161000e5ea2904057a49f16b5ac8b to your computer and use it in GitHub Desktop.
HTTP/1.1服务器实例,带POST表单支持
from socket import socket, AF_INET, SOCK_STREAM
from urllib.parse import unquote_plus
def read_http_request_header_string(conn: socket):
buffer = bytearray()
while True:
buffer.extend(conn.recv(4096))
header_length = buffer.find(b"\r\n\r\n")
if header_length != -1:
return buffer[:header_length], buffer[header_length+4:]
def read_http_request(conn: socket):
s, rest = read_http_request_header_string(conn)
s = s.decode("ascii")
lines = s.split("\r\n")
# Parse the first line
fstline = lines[0]
method, path, protocol = fstline.split(" ")
if protocol != "HTTP/1.1":
raise RuntimeError("unknown protocol", protocol)
method = method.lower()
# Parse headers
header = {}
if len(lines) > 1:
for line in lines[1:]:
key, value = line.split(": ")
header[key] = value
return method, path, header, rest
def read_http_request_body(conn: socket, length: int, buffer: bytearray):
while len(buffer) < length:
buffer.extend(conn.recv(4096))
return buffer[:length]
STATUS_MESSAGES = {
200: "OK",
303: "See Other",
400: "Bad Request",
404: "Not Found",
500: "Server Error",
}
def build_http_response_header(status_code, headers):
lines = [
f"HTTP/1.1 {status_code} {STATUS_MESSAGES[status_code]}"
]
for key, value in headers:
lines.append(f"{key}: {value}")
lines.append("")
lines.append("")
return '\r\n'.join(lines).encode('ascii')
DEFAULT_PAGE_HTML = """<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>Default Page</title>
</head>
<body>
<h1>Hello {name}!</h1>
<form method="GET">
<input name="name"/>
<button type="submit">Submit</button>
</form>
<h2>Default Name</h2>
<form method="POST">
<input name="default_name" />
<button type="submit">Submit</button>
</form>
</body>
</html>"""
def read_path(path: str):
parts = path.split("?", maxsplit=1)
if len(parts) > 1:
return parts[0], parts[1]
else:
return parts[0], ""
def parse_query(q: str):
pairs_str = q.split("&")
pairs = {}
for s in pairs_str:
if s:
k, v = s.split("=")
pairs[unquote_plus(k)] = unquote_plus(v)
return pairs
global default_name
default_name = "World"
def handle_http_request(conn: socket):
global default_name
try:
method, path, headers, rest = read_http_request(conn)
if "Content-Length" in headers:
body = read_http_request_body(conn, int(headers["Content-Length"]), rest)
else:
body = bytearray()
except Exception as e:
conn.send(build_http_response_header(400, []))
print(f"- - 400 {STATUS_MESSAGES[400]}")
raise e
onlypath, query = read_path(path)
if onlypath == "/":
if method == "get":
query_dict = parse_query(query)
# Render page
content = DEFAULT_PAGE_HTML.format(
name=(query_dict["name"] if query_dict.get("name") else default_name)
).encode("utf-8")
headers = [
("Charset", "UTF-8"),
("Content-Length",str(len(content))),
("Connection", "close"),
]
conn.send(build_http_response_header(200, headers))
conn.send(content)
print(f"{method.upper()} {path} 200 {STATUS_MESSAGES[200]}")
return
elif method == "post":
form_data_s = body.decode("utf-8")
form_dict = parse_query(form_data_s)
if form_dict.get("default_name"):
default_name = form_dict["default_name"]
conn.send(build_http_response_header(303, [
("Connection", "close"),
("Location", "."),
]))
print(f"{method.upper()} {path} 303 {STATUS_MESSAGES[303]}")
return
conn.send(build_http_response_header(404, []))
print(f"{method.upper()} {path} 404 {STATUS_MESSAGES[404]}")
def main():
with socket(AF_INET, SOCK_STREAM) as server_port:
server_port.bind(("127.0.0.1", 8989))
server_port.listen()
while True:
conn, addr = server_port.accept()
with conn:
conn.settimeout(8)
handle_http_request(conn)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment