Skip to content

Instantly share code, notes, and snippets.

@berdario
Created December 7, 2014 21:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save berdario/39313b94cd08ebe6b3fb to your computer and use it in GitHub Desktop.
Save berdario/39313b94cd08ebe6b3fb to your computer and use it in GitHub Desktop.
A toy HTTP server in pure Python3 I wrote in September 2013
#! /usr/bin/env python
import sys
import os
import locale
from collections import defaultdict
from socket import socket, AF_INET, SOCK_STREAM
from urllib.parse import urlparse
from contextlib import closing
from functools import wraps
import magic
args = dict(enumerate(sys.argv))
encoding_guesser = magic.Magic(mime_encoding=True)
curdir = os.path.abspath(b'.')
ENCODING = locale.getlocale()[1]
HTTP_METHODS = frozenset({b'OPTIONS', b'GET', b'HEAD', b'POST', b'PUT', b'DELETE', b'TRACE', b'CONNECT'})
HTTP_VERSION = b'HTTP/1.1'
def parse_path(f):
@wraps(f)
def handler(resource, headers):
if not resource.startswith(b'/'):
resource = urlparse(resource).path or b'/'
resource = os.path.abspath(resource[1:] or b'.')
if not resource.startswith(curdir):
resource = curdir
return f(resource, headers)
return handler
def guess_type(path):
typ = magic.from_file(path, mime=True)
if typ.startswith(b'text'):
return typ + b"; charset=" + encoding_guesser.from_file(path)
else:
return typ
def file_browser(path):
headers = {b'Connection': b'close'}
if not os.path.exists(path):
return (None, headers)
elif os.path.isdir(path):
content_reader = lambda: b"\n".join(p for p in os.listdir(path) if not p.startswith(b'.'))
headers.update({b'Content-Type': b'text/plain; charset=UTF-8'})
return (content_reader, headers)
else:
headers.update({b'Content-Type': guess_type(path.decode())})
def content_reader():
with open(path, 'rb') as f:
return f.read()
return (content_reader, headers)
def build_response(resource, headers, handler):
content, response_headers = file_browser(resource)
if content is None:
return HTTP_VERSION + b" 404 File Not Found\r\n\r\n"
else:
status_line = b" ".join([HTTP_VERSION, b'202', b'Accepted'])
headers_bytes = b"\r\n".join(map(b':'.join, response_headers.items())) + b'\r\n'
return handler(status_line, headers_bytes, content)
@parse_path
def handle_GET(resource, headers):
def handler(status_line, headers_bytes, content_reader):
return b"\r\n".join([status_line, headers_bytes, content_reader()])
return build_response(resource, headers, handler)
@parse_path
def handle_HEAD(resource, headers):
def handler(status_line, headers_bytes, content):
return b"\r\n".join([status_line, headers_bytes, b''])
return build_response(resource, headers, handler)
def missing_method(resource, headers):
return HTTP_VERSION + b" 405 Method Not Allowed\r\n\r\n"
methodmap = defaultdict(lambda : missing_method)
methodmap.update({b'GET': handle_GET, b'HEAD': handle_HEAD})
def handle_client(sock):
data = sock.recv(1000)
while not data.endswith(b"\r\n\r\n"):
newdata = sock.recv(1000)
if not newdata:
return
data += newdata
headers = data.strip().splitlines()
request, headers = headers[0], headers[1:]
headers = dict(map(lambda s: s.split(b':', 1), headers))
try:
method, resource, http_version = request.split()
except ValueError:
response = HTTP_VERSION + b" 400 Bad Request\r\n\r\n"
else:
if method not in HTTP_METHODS:
response = HTTP_VERSION + b" 501 Not Implemented\r\n\r\n"
else:
response = methodmap[method](resource, headers)
sock.sendall(response)
if __name__ == '__main__':
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(('', int(args.get(1, 8080))))
sock.listen(1000)
while True:
with closing(sock.accept()[0]) as newsock:
handle_client(newsock)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment