-
-
Save gbrova/3493875a92875ade3259e192dc679d94 to your computer and use it in GitHub Desktop.
Super simple HTTP server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import threading | |
from typing import List, Dict, Tuple | |
KV_SEP = ": " | |
CRLF = "\r\n" | |
EMPTY_LINE = CRLF + CRLF | |
class ConnectionClosedError(Exception): | |
pass | |
class Request: | |
def __init__(self, request_line, headers: dict, body: bytes): | |
self.request_line = request_line # TODO parse this to get the method and return url | |
self.headers = headers | |
self.body = body | |
def should_keepalive(self): | |
keepalive = True | |
if "HTTP/1.0" in self.request_line: | |
keepalive = False | |
# TODO parse header to see if we should change the default | |
return keepalive | |
def path(self): | |
method, path, httpversion = self.request_line.split(" ") | |
return path[1:] | |
class RequestBuilder: | |
def __init__(self, more_data_fn): | |
self.more_data = more_data_fn | |
def _read_header(self) -> Tuple[str, bytes]: | |
all_data = b"" | |
all_data_str = "" | |
empty_line_pos = -1 | |
while True: | |
data = self.more_data() | |
all_data += data | |
all_data_str = all_data.decode("ascii") | |
empty_line_pos = all_data_str.find(EMPTY_LINE) | |
if empty_line_pos > 0: | |
break | |
headers_str = all_data_str[:empty_line_pos] | |
continuation = all_data[empty_line_pos + len(EMPTY_LINE) :] | |
return headers_str, continuation | |
def _parse_headers_str(self, headers_str: str) -> Tuple[str, Dict]: | |
headers_arr = headers_str.split(CRLF) | |
# Actually the first line is the request_line, which is not a header | |
request_line = headers_arr[0] | |
headers_arr = headers_arr[1:] | |
# Technically the spec allows for duplicate headers, but that seems annoying and I dont want to bother with it. | |
# So we'll use a dict instead of a list | |
headers_dict = {line.split(KV_SEP)[0]: line.split(KV_SEP)[1] for line in headers_arr} | |
return request_line, headers_dict | |
def _read_remaining_body(self, continuation_bytes: bytes, total_payload_length: int) -> bytes: | |
remaining_bytes = total_payload_length - len(continuation_bytes) | |
while remaining_bytes > 0: | |
more_bytes = self.more_data() | |
continuation_bytes += more_bytes | |
remaining_bytes = total_payload_length - len(continuation_bytes) | |
if remaining_bytes < 0: | |
# TODO we probably need to handle this properly if we are to support simultaneous requests | |
print("Error, negative remaining bytes, probably read the next request!") | |
return continuation_bytes | |
def parse_request(self) -> Request: | |
# Header ends with an empty line | |
headers_str, continuation_bytes = self._read_header() | |
request_line, headers = self._parse_headers_str(headers_str) | |
payload_length = int(headers.get("Content-Length", 0)) | |
body = self._read_remaining_body(continuation_bytes, payload_length) | |
request = Request(request_line, headers, body) | |
return request | |
class Response: | |
def __init__(self, status_code: int, headers: dict, body: bytes): | |
self.status_code = status_code | |
self.headers = headers | |
self.body = body | |
def make_length_header(self): | |
self.headers["Content-Length"] = str(len(self.body)) | |
def status_code_msg(self): | |
if self.status_code == 200: | |
return "OK" | |
return "UNKNOWN_MSG" | |
def as_http_bytes(self) -> bytes: | |
# HTTP requires us to send the status line, headers, a newline, and finally the body | |
self.make_length_header() | |
status_line = f"HTTP/1.1 {self.status_code} {self.status_code_msg()}" | |
header_lines = [item[0] + KV_SEP + item[1] for item in self.headers.items()] | |
header_lines = [status_line] + header_lines | |
header_str = CRLF.join(header_lines) + EMPTY_LINE | |
header_bytes = header_str.encode("ascii") | |
message_bytes = header_bytes + self.body | |
return message_bytes | |
def handle_request(request: Request) -> Response: | |
path = request.path() | |
try: | |
data = open(path, "rb").read() | |
return Response(200, {}, data) | |
except FileNotFoundError: | |
# That resource doesn't exist, return a 404! | |
return Response(404, {}, "Not found :(".encode("utf-8")) | |
def _more_data_fn(conn): | |
# Wrapper around the connection to just request more bytes | |
def more_data() -> bytes: | |
# Read more data from the socket, 1024 bytes at a time. | |
# Unfortunately we don't know how big the request is until we read all the headers. | |
try: | |
data = conn.recv(1024) | |
except: | |
# It is possible to get an exception on timeout | |
raise ConnectionClosedError() | |
if not data: | |
raise ConnectionClosedError() | |
return data | |
return more_data | |
def handle_connection(conn, addr): | |
conn.settimeout(10) | |
with conn: | |
while True: | |
rb = RequestBuilder(_more_data_fn(conn)) | |
try: | |
request = rb.parse_request() | |
except ConnectionClosedError: | |
return | |
print("Headers: ", request.headers) | |
print("Content: ", request.body) | |
response = handle_request(request) | |
conn.sendall(response.as_http_bytes()) | |
if not request.should_keepalive(): | |
conn.close() | |
def main_loop(): | |
host = "127.0.0.1" | |
port = 12345 | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.bind((host, port)) | |
s.listen() | |
while True: | |
conn, addr = s.accept() | |
worker = threading.Thread(None, target=handle_connection, name=f"worker-{addr}", args=(conn, addr)) | |
worker.start() | |
if __name__ == "__main__": | |
main_loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment