-
-
Save thislight/d32a565609ee1f2379fc9b3af1d87012 to your computer and use it in GitHub Desktop.
HTTP/1.1服务器实例,带简单查询字符串支持
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from socket import socket, AF_INET, SOCK_STREAM | |
| from urllib.parse import unquote_plus | |
| def read_http_request_header_string(conn: socket): | |
| buffer = bytearray() | |
| while True: | |
| buffer.extend(conn.recv(4096)) | |
| header_length = buffer.find(b"\r\n\r\n") | |
| if header_length != -1: | |
| return buffer[:header_length] | |
| def read_http_request(conn: socket): | |
| s = read_http_request_header_string(conn).decode("ascii") | |
| lines = s.split("\r\n") | |
| # Parse the first line | |
| fstline = lines[0] | |
| method, path, protocol = fstline.split(" ") | |
| if protocol != "HTTP/1.1": | |
| raise RuntimeError("unknown protocol", protocol) | |
| method = method.lower() | |
| # Parse headers | |
| header = {} | |
| if len(lines) > 1: | |
| for line in lines[1:]: | |
| key, value = line.split(": ") | |
| header[key] = value | |
| return method, path, header | |
| STATUS_MESSAGES = { | |
| 200: "OK", | |
| 400: "Bad Request", | |
| 404: "Not Found", | |
| 500: "Server Error", | |
| } | |
| def build_http_response_header(status_code, headers): | |
| lines = [ | |
| f"HTTP/1.1 {status_code} {STATUS_MESSAGES[status_code]}" | |
| ] | |
| for key, value in headers: | |
| lines.append(f"{key}: {value}") | |
| lines.append("") | |
| lines.append("") | |
| return '\r\n'.join(lines).encode('ascii') | |
| DEFAULT_PAGE_HTML = """<!doctype html> | |
| <html> | |
| <head> | |
| <meta charset="utf-8" /> | |
| <title>Default Page</title> | |
| </head> | |
| <body> | |
| <h1>Hello {name}!</h1> | |
| <form method="GET"> | |
| <input name="name" label="Hello to..." /> | |
| <button type="submit">Submit</button> | |
| </form> | |
| </body> | |
| </html>""" | |
| def read_path(path: str): | |
| parts = path.split("?", maxsplit=1) | |
| if len(parts) > 1: | |
| return parts[0], parts[1] | |
| else: | |
| return parts[0], "" | |
| def parse_query(q: str): | |
| pairs_str = q.split("&") | |
| pairs = {} | |
| for s in pairs_str: | |
| if s: | |
| k, v = s.split("=") | |
| pairs[unquote_plus(k)] = unquote_plus(v) | |
| return pairs | |
| def handle_http_request(conn: socket): | |
| try: | |
| method, path, headers = read_http_request(conn) | |
| except Exception as e: | |
| conn.send(build_http_response_header(400, [])) | |
| print(f"- - 400 {STATUS_MESSAGES[400]}") | |
| raise e | |
| onlypath, query = read_path(path) | |
| if onlypath == "/" and method == "get": | |
| query_dict = parse_query(query) | |
| # Render page | |
| content = DEFAULT_PAGE_HTML.format( | |
| name=(query_dict["name"] if query_dict.get("name") else "World") | |
| ).encode("utf-8") | |
| headers = [ | |
| ("Charset", "UTF-8"), | |
| ("Content-Length",str(len(content))), | |
| ("Connection", "close"), | |
| ] | |
| conn.send(build_http_response_header(200, headers)) | |
| conn.send(content) | |
| print(f"{method.upper()} {path} 200 {STATUS_MESSAGES[200]}") | |
| else: | |
| conn.send(build_http_response_header(404, [])) | |
| print(f"{method.upper()} {path} 404 {STATUS_MESSAGES[404]}") | |
| def main(): | |
| with socket(AF_INET, SOCK_STREAM) as server_port: | |
| server_port.bind(("127.0.0.1", 8989)) | |
| server_port.listen() | |
| while True: | |
| conn, addr = server_port.accept() | |
| with conn: | |
| conn.settimeout(8) | |
| handle_http_request(conn) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment