Skip to content

Instantly share code, notes, and snippets.

@berdario

berdario/dummyhttp.py Secret

Created Dec 7, 2014
Embed
What would you like to do?
Python2 toy http server (automatically converted from https://gist.github.com/39313b94cd08ebe6b3fb )
#! /usr/bin/env python
from __future__ import with_statement
import sys
import os
import locale
from collections import defaultdict
from socket import socket, AF_INET, SOCK_STREAM
from urlparse import urlparse
from contextlib import closing
from functools import wraps
import magic
from io import open
from itertools import imap
args = dict(enumerate(sys.argv))
encoding_guesser = magic.Magic(mime_encoding=True)
curdir = os.path.abspath('.')
ENCODING = locale.getlocale()[1]
HTTP_METHODS = frozenset(set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT']))
HTTP_VERSION = 'HTTP/1.1'
def parse_path(f):
@wraps(f)
def handler(resource, headers):
if not resource.startswith('/'):
resource = urlparse(resource).path or '/'
resource = os.path.abspath(resource[1:] or '.')
if not resource.startswith(curdir):
resource = curdir
return f(resource, headers)
return handler
def guess_type(path):
typ = magic.from_file(path, mime=True)
if typ.startswith('text'):
return typ + "; charset=" + encoding_guesser.from_file(path)
else:
return typ
def file_browser(path):
headers = {'Connection': 'close'}
if not os.path.exists(path):
return (None, headers)
elif os.path.isdir(path):
content_reader = lambda: "\n".join(p for p in os.listdir(path) if not p.startswith('.'))
headers.update({'Content-Type': 'text/plain; charset=UTF-8'})
return (content_reader, headers)
else:
headers.update({'Content-Type': guess_type(path.decode())})
def content_reader():
with open(path, u'rb') as f:
return f.read()
return (content_reader, headers)
def build_response(resource, headers, handler):
content, response_headers = file_browser(resource)
if content is None:
return HTTP_VERSION + " 404 File Not Found\r\n\r\n"
else:
status_line = " ".join([HTTP_VERSION, '202', 'Accepted'])
headers_bytes = "\r\n".join(imap(':'.join, response_headers.items())) + '\r\n'
return handler(status_line, headers_bytes, content)
@parse_path
def handle_GET(resource, headers):
def handler(status_line, headers_bytes, content_reader):
return "\r\n".join([status_line, headers_bytes, content_reader()])
return build_response(resource, headers, handler)
@parse_path
def handle_HEAD(resource, headers):
def handler(status_line, headers_bytes, content):
return "\r\n".join([status_line, headers_bytes, ''])
return build_response(resource, headers, handler)
def missing_method(resource, headers):
return HTTP_VERSION + " 405 Method Not Allowed\r\n\r\n"
methodmap = defaultdict(lambda : missing_method)
methodmap.update({'GET': handle_GET, 'HEAD': handle_HEAD})
def handle_client(sock):
data = sock.recv(1000)
while not data.endswith("\r\n\r\n"):
newdata = sock.recv(1000)
if not newdata:
return
data += newdata
headers = data.strip().splitlines()
request, headers = headers[0], headers[1:]
headers = dict(imap(lambda s: s.split(':', 1), headers))
try:
method, resource, http_version = request.split()
except ValueError:
response = HTTP_VERSION + " 400 Bad Request\r\n\r\n"
else:
if method not in HTTP_METHODS:
response = HTTP_VERSION + " 501 Not Implemented\r\n\r\n"
else:
response = methodmap[method](resource, headers)
sock.sendall(response)
if __name__ == u'__main__':
sock = socket(AF_INET, SOCK_STREAM)
sock.bind((u'', int(args.get(1, 8080))))
sock.listen(1000)
while True:
with closing(sock.accept()[0]) as newsock:
handle_client(newsock)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment