Created
December 7, 2014 21:37
-
-
Save berdario/cbccaf7f36d61840e0ed to your computer and use it in GitHub Desktop.
3to2 porting of the 2to3 porting of the 3to2 porting of https://gist.github.com/berdario/39313b94cd08ebe6b3fb portingception!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
from __future__ import with_statement | |
import sys | |
import os | |
import locale | |
from collections import defaultdict | |
from socket import socket, AF_INET, SOCK_STREAM | |
from urlparse import urlparse | |
from contextlib import closing | |
from functools import wraps | |
import magic | |
from io import open | |
from itertools import imap | |
args = dict(enumerate(sys.argv)) | |
encoding_guesser = magic.Magic(mime_encoding=True) | |
curdir = os.path.abspath(u'.') | |
ENCODING = locale.getlocale()[1] | |
HTTP_METHODS = frozenset(set([u'OPTIONS', u'GET', u'HEAD', u'POST', u'PUT', u'DELETE', u'TRACE', u'CONNECT'])) | |
HTTP_VERSION = u'HTTP/1.1' | |
def parse_path(f): | |
@wraps(f) | |
def handler(resource, headers): | |
resource = resource.decode() | |
if not resource.startswith(u'/'): | |
resource = urlparse(resource).path or u'/' | |
resource = os.path.abspath(resource[1:] or u'.') | |
if not resource.startswith(curdir): | |
resource = curdir | |
return f(resource, headers) | |
return handler | |
def guess_type(path): | |
typ = magic.from_file(path, mime=True).decode() | |
if typ.startswith(u'text'): | |
return typ + u"; charset=" + encoding_guesser.from_file(path).decode() | |
else: | |
return typ | |
def file_browser(path): | |
headers = {u'Connection': u'close'} | |
if not os.path.exists(path): | |
return (None, headers) | |
elif os.path.isdir(path): | |
content_reader = lambda: u"\n".join(p for p in os.listdir(path) if not p.startswith(u'.')).encode('utf-8') | |
headers.update({u'Content-Type': u'text/plain; charset=UTF-8'}) | |
return (content_reader, headers) | |
else: | |
headers.update({u'Content-Type': guess_type(path)}) | |
def content_reader(): | |
with open(path, u'rb') as f: | |
return f.read() | |
return (content_reader, headers) | |
def build_response(resource, headers, handler): | |
content, response_headers = file_browser(resource) | |
if content is None: | |
return (HTTP_VERSION + u" 404 File Not Found\r\n\r\n").encode() | |
else: | |
status_line = u" ".join([HTTP_VERSION, u'202', u'Accepted']).encode() | |
headers_bytes = (u"\r\n".join(imap(u':'.join, list(response_headers.items()))) + u'\r\n').encode() | |
return handler(status_line, headers_bytes, content) | |
@parse_path | |
def handle_GET(resource, headers): | |
def handler(status_line, headers_bytes, content_reader): | |
return "\r\n".join([status_line, headers_bytes, content_reader()]) | |
return build_response(resource, headers, handler) | |
@parse_path | |
def handle_HEAD(resource, headers): | |
def handler(status_line, headers_bytes, content): | |
return "\r\n".join([status_line, headers_bytes, u'']) | |
return build_response(resource, headers, handler) | |
def missing_method(resource, headers): | |
return HTTP_VERSION + u" 405 Method Not Allowed\r\n\r\n" | |
methodmap = defaultdict(lambda : missing_method) | |
methodmap.update({u'GET': handle_GET, u'HEAD': handle_HEAD}) | |
def handle_client(sock): | |
data = sock.recv(1000) | |
while not data.endswith("\r\n\r\n"): | |
newdata = sock.recv(1000) | |
if not newdata: | |
return | |
data += newdata | |
headers = data.strip().splitlines() | |
request, headers = headers[0], headers[1:] | |
headers = dict(imap(lambda s: s.split(':', 1), headers)) | |
try: | |
method, resource, http_version = request.split() | |
except ValueError: | |
response = HTTP_VERSION + u" 400 Bad Request\r\n\r\n" | |
else: | |
method = method.decode() | |
if method not in HTTP_METHODS: | |
response = HTTP_VERSION + u" 501 Not Implemented\r\n\r\n" | |
else: | |
response = methodmap[method](resource, headers) | |
sock.sendall(response) | |
if __name__ == u'__main__': | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.bind((u'', int(args.get(1, 8080)))) | |
sock.listen(1000) | |
while True: | |
with closing(sock.accept()[0]) as newsock: | |
handle_client(newsock) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment