Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
3to2 porting of the 2to3 porting of the 3to2 porting of https://gist.github.com/berdario/39313b94cd08ebe6b3fb portingception!
#! /usr/bin/env python
from __future__ import with_statement
import sys
import os
import locale
from collections import defaultdict
from socket import socket, AF_INET, SOCK_STREAM
from urlparse import urlparse
from contextlib import closing
from functools import wraps
import magic
from io import open
from itertools import imap
args = dict(enumerate(sys.argv))
encoding_guesser = magic.Magic(mime_encoding=True)
curdir = os.path.abspath(u'.')
ENCODING = locale.getlocale()[1]
HTTP_METHODS = frozenset(set([u'OPTIONS', u'GET', u'HEAD', u'POST', u'PUT', u'DELETE', u'TRACE', u'CONNECT']))
HTTP_VERSION = u'HTTP/1.1'
def parse_path(f):
@wraps(f)
def handler(resource, headers):
resource = resource.decode()
if not resource.startswith(u'/'):
resource = urlparse(resource).path or u'/'
resource = os.path.abspath(resource[1:] or u'.')
if not resource.startswith(curdir):
resource = curdir
return f(resource, headers)
return handler
def guess_type(path):
typ = magic.from_file(path, mime=True).decode()
if typ.startswith(u'text'):
return typ + u"; charset=" + encoding_guesser.from_file(path).decode()
else:
return typ
def file_browser(path):
headers = {u'Connection': u'close'}
if not os.path.exists(path):
return (None, headers)
elif os.path.isdir(path):
content_reader = lambda: u"\n".join(p for p in os.listdir(path) if not p.startswith(u'.')).encode('utf-8')
headers.update({u'Content-Type': u'text/plain; charset=UTF-8'})
return (content_reader, headers)
else:
headers.update({u'Content-Type': guess_type(path)})
def content_reader():
with open(path, u'rb') as f:
return f.read()
return (content_reader, headers)
def build_response(resource, headers, handler):
content, response_headers = file_browser(resource)
if content is None:
return (HTTP_VERSION + u" 404 File Not Found\r\n\r\n").encode()
else:
status_line = u" ".join([HTTP_VERSION, u'202', u'Accepted']).encode()
headers_bytes = (u"\r\n".join(imap(u':'.join, list(response_headers.items()))) + u'\r\n').encode()
return handler(status_line, headers_bytes, content)
@parse_path
def handle_GET(resource, headers):
def handler(status_line, headers_bytes, content_reader):
return "\r\n".join([status_line, headers_bytes, content_reader()])
return build_response(resource, headers, handler)
@parse_path
def handle_HEAD(resource, headers):
def handler(status_line, headers_bytes, content):
return "\r\n".join([status_line, headers_bytes, u''])
return build_response(resource, headers, handler)
def missing_method(resource, headers):
return HTTP_VERSION + u" 405 Method Not Allowed\r\n\r\n"
methodmap = defaultdict(lambda : missing_method)
methodmap.update({u'GET': handle_GET, u'HEAD': handle_HEAD})
def handle_client(sock):
data = sock.recv(1000)
while not data.endswith("\r\n\r\n"):
newdata = sock.recv(1000)
if not newdata:
return
data += newdata
headers = data.strip().splitlines()
request, headers = headers[0], headers[1:]
headers = dict(imap(lambda s: s.split(':', 1), headers))
try:
method, resource, http_version = request.split()
except ValueError:
response = HTTP_VERSION + u" 400 Bad Request\r\n\r\n"
else:
method = method.decode()
if method not in HTTP_METHODS:
response = HTTP_VERSION + u" 501 Not Implemented\r\n\r\n"
else:
response = methodmap[method](resource, headers)
sock.sendall(response)
if __name__ == u'__main__':
sock = socket(AF_INET, SOCK_STREAM)
sock.bind((u'', int(args.get(1, 8080))))
sock.listen(1000)
while True:
with closing(sock.accept()[0]) as newsock:
handle_client(newsock)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.