Skip to content

Instantly share code, notes, and snippets.

@gbrova
Last active September 27, 2020 13:11
Show Gist options
  • Save gbrova/3493875a92875ade3259e192dc679d94 to your computer and use it in GitHub Desktop.
Save gbrova/3493875a92875ade3259e192dc679d94 to your computer and use it in GitHub Desktop.
Super simple HTTP server
import socket
import threading
from typing import List, Dict, Tuple
KV_SEP = ": "
CRLF = "\r\n"
EMPTY_LINE = CRLF + CRLF
class ConnectionClosedError(Exception):
pass
class Request:
def __init__(self, request_line, headers: dict, body: bytes):
self.request_line = request_line # TODO parse this to get the method and return url
self.headers = headers
self.body = body
def should_keepalive(self):
keepalive = True
if "HTTP/1.0" in self.request_line:
keepalive = False
# TODO parse header to see if we should change the default
return keepalive
def path(self):
method, path, httpversion = self.request_line.split(" ")
return path[1:]
class RequestBuilder:
def __init__(self, more_data_fn):
self.more_data = more_data_fn
def _read_header(self) -> Tuple[str, bytes]:
all_data = b""
all_data_str = ""
empty_line_pos = -1
while True:
data = self.more_data()
all_data += data
all_data_str = all_data.decode("ascii")
empty_line_pos = all_data_str.find(EMPTY_LINE)
if empty_line_pos > 0:
break
headers_str = all_data_str[:empty_line_pos]
continuation = all_data[empty_line_pos + len(EMPTY_LINE) :]
return headers_str, continuation
def _parse_headers_str(self, headers_str: str) -> Tuple[str, Dict]:
headers_arr = headers_str.split(CRLF)
# Actually the first line is the request_line, which is not a header
request_line = headers_arr[0]
headers_arr = headers_arr[1:]
# Technically the spec allows for duplicate headers, but that seems annoying and I dont want to bother with it.
# So we'll use a dict instead of a list
headers_dict = {line.split(KV_SEP)[0]: line.split(KV_SEP)[1] for line in headers_arr}
return request_line, headers_dict
def _read_remaining_body(self, continuation_bytes: bytes, total_payload_length: int) -> bytes:
remaining_bytes = total_payload_length - len(continuation_bytes)
while remaining_bytes > 0:
more_bytes = self.more_data()
continuation_bytes += more_bytes
remaining_bytes = total_payload_length - len(continuation_bytes)
if remaining_bytes < 0:
# TODO we probably need to handle this properly if we are to support simultaneous requests
print("Error, negative remaining bytes, probably read the next request!")
return continuation_bytes
def parse_request(self) -> Request:
# Header ends with an empty line
headers_str, continuation_bytes = self._read_header()
request_line, headers = self._parse_headers_str(headers_str)
payload_length = int(headers.get("Content-Length", 0))
body = self._read_remaining_body(continuation_bytes, payload_length)
request = Request(request_line, headers, body)
return request
class Response:
def __init__(self, status_code: int, headers: dict, body: bytes):
self.status_code = status_code
self.headers = headers
self.body = body
def make_length_header(self):
self.headers["Content-Length"] = str(len(self.body))
def status_code_msg(self):
if self.status_code == 200:
return "OK"
return "UNKNOWN_MSG"
def as_http_bytes(self) -> bytes:
# HTTP requires us to send the status line, headers, a newline, and finally the body
self.make_length_header()
status_line = f"HTTP/1.1 {self.status_code} {self.status_code_msg()}"
header_lines = [item[0] + KV_SEP + item[1] for item in self.headers.items()]
header_lines = [status_line] + header_lines
header_str = CRLF.join(header_lines) + EMPTY_LINE
header_bytes = header_str.encode("ascii")
message_bytes = header_bytes + self.body
return message_bytes
def handle_request(request: Request) -> Response:
path = request.path()
try:
data = open(path, "rb").read()
return Response(200, {}, data)
except FileNotFoundError:
# That resource doesn't exist, return a 404!
return Response(404, {}, "Not found :(".encode("utf-8"))
def _more_data_fn(conn):
# Wrapper around the connection to just request more bytes
def more_data() -> bytes:
# Read more data from the socket, 1024 bytes at a time.
# Unfortunately we don't know how big the request is until we read all the headers.
try:
data = conn.recv(1024)
except:
# It is possible to get an exception on timeout
raise ConnectionClosedError()
if not data:
raise ConnectionClosedError()
return data
return more_data
def handle_connection(conn, addr):
conn.settimeout(10)
with conn:
while True:
rb = RequestBuilder(_more_data_fn(conn))
try:
request = rb.parse_request()
except ConnectionClosedError:
return
print("Headers: ", request.headers)
print("Content: ", request.body)
response = handle_request(request)
conn.sendall(response.as_http_bytes())
if not request.should_keepalive():
conn.close()
def main_loop():
host = "127.0.0.1"
port = 12345
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, port))
s.listen()
while True:
conn, addr = s.accept()
worker = threading.Thread(None, target=handle_connection, name=f"worker-{addr}", args=(conn, addr))
worker.start()
if __name__ == "__main__":
main_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment