Skip to content

Instantly share code, notes, and snippets.

@berdario
Last active August 29, 2015 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save berdario/34370a8bc39895cae139 to your computer and use it in GitHub Desktop.
Save berdario/34370a8bc39895cae139 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
import sys
import os
import locale
from collections import defaultdict
from socket import socket, AF_INET, SOCK_STREAM
from urllib.parse import urlparse
from contextlib import closing
from functools import wraps
import magic
from io import open
args = dict(enumerate(sys.argv))
encoding_guesser = magic.Magic(mime_encoding=True)
curdir = os.path.abspath('.')
ENCODING = locale.getlocale()[1]
HTTP_METHODS = frozenset(set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT']))
HTTP_VERSION = 'HTTP/1.1'
def parse_path(f):
@wraps(f)
def handler(resource, headers):
resource = resource.decode()
if not resource.startswith('/'):
resource = urlparse(resource).path or '/'
resource = os.path.abspath(resource[1:] or '.')
if not resource.startswith(curdir):
resource = curdir
return f(resource, headers)
return handler
def guess_type(path):
typ = magic.from_file(path, mime=True).decode()
if typ.startswith('text'):
return typ + "; charset=" + encoding_guesser.from_file(path).decode()
else:
return typ
def file_browser(path):
headers = {'Connection': 'close'}
if not os.path.exists(path):
return (None, headers)
elif os.path.isdir(path):
content_reader = lambda: "\n".join(p for p in os.listdir(path) if not p.startswith('.')).encode()
headers.update({'Content-Type': 'text/plain; charset=UTF-8'})
return (content_reader, headers)
else:
headers.update({'Content-Type': guess_type(path)})
def content_reader():
with open(path, 'rb') as f:
return f.read()
return (content_reader, headers)
def build_response(resource, headers, handler):
content, response_headers = file_browser(resource)
if content is None:
return (HTTP_VERSION + " 404 File Not Found\r\n\r\n").encode()
else:
status_line = " ".join([HTTP_VERSION, '202', 'Accepted']).encode()
headers_bytes = ("\r\n".join(map(':'.join, list(response_headers.items()))) + '\r\n').encode()
return handler(status_line, headers_bytes, content)
@parse_path
def handle_GET(resource, headers):
def handler(status_line, headers_bytes, content_reader):
return b"\r\n".join([status_line, headers_bytes, content_reader()])
return build_response(resource, headers, handler)
@parse_path
def handle_HEAD(resource, headers):
def handler(status_line, headers_bytes, content):
return b"\r\n".join([status_line, headers_bytes, ''])
return build_response(resource, headers, handler)
def missing_method(resource, headers):
return HTTP_VERSION + " 405 Method Not Allowed\r\n\r\n"
methodmap = defaultdict(lambda : missing_method)
methodmap.update({'GET': handle_GET, 'HEAD': handle_HEAD})
def handle_client(sock):
data = sock.recv(1000)
while not data.endswith(b"\r\n\r\n"):
newdata = sock.recv(1000)
if not newdata:
return
data += newdata
headers = data.strip().splitlines()
request, headers = headers[0], headers[1:]
headers = dict(map(lambda s: s.split(b':', 1), headers))
try:
method, resource, http_version = request.split()
except ValueError:
response = HTTP_VERSION + " 400 Bad Request\r\n\r\n"
else:
method = method.decode()
if method not in HTTP_METHODS:
response = HTTP_VERSION + " 501 Not Implemented\r\n\r\n"
else:
response = methodmap[method](resource, headers)
sock.sendall(response)
if __name__ == '__main__':
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(('', int(args.get(1, 8080))))
sock.listen(1000)
while True:
with closing(sock.accept()[0]) as newsock:
handle_client(newsock)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment