-
-
Save berdario/34370a8bc39895cae139 to your computer and use it in GitHub Desktop.
Python3 porting of https://gist.github.com/berdario/8abfd9020894e72b310a
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
import sys | |
import os | |
import locale | |
from collections import defaultdict | |
from socket import socket, AF_INET, SOCK_STREAM | |
from urllib.parse import urlparse | |
from contextlib import closing | |
from functools import wraps | |
import magic | |
from io import open | |
args = dict(enumerate(sys.argv)) | |
encoding_guesser = magic.Magic(mime_encoding=True) | |
curdir = os.path.abspath('.') | |
ENCODING = locale.getlocale()[1] | |
HTTP_METHODS = frozenset(set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT'])) | |
HTTP_VERSION = 'HTTP/1.1' | |
def parse_path(f): | |
@wraps(f) | |
def handler(resource, headers): | |
resource = resource.decode() | |
if not resource.startswith('/'): | |
resource = urlparse(resource).path or '/' | |
resource = os.path.abspath(resource[1:] or '.') | |
if not resource.startswith(curdir): | |
resource = curdir | |
return f(resource, headers) | |
return handler | |
def guess_type(path): | |
typ = magic.from_file(path, mime=True).decode() | |
if typ.startswith('text'): | |
return typ + "; charset=" + encoding_guesser.from_file(path).decode() | |
else: | |
return typ | |
def file_browser(path): | |
headers = {'Connection': 'close'} | |
if not os.path.exists(path): | |
return (None, headers) | |
elif os.path.isdir(path): | |
content_reader = lambda: "\n".join(p for p in os.listdir(path) if not p.startswith('.')).encode() | |
headers.update({'Content-Type': 'text/plain; charset=UTF-8'}) | |
return (content_reader, headers) | |
else: | |
headers.update({'Content-Type': guess_type(path)}) | |
def content_reader(): | |
with open(path, 'rb') as f: | |
return f.read() | |
return (content_reader, headers) | |
def build_response(resource, headers, handler): | |
content, response_headers = file_browser(resource) | |
if content is None: | |
return (HTTP_VERSION + " 404 File Not Found\r\n\r\n").encode() | |
else: | |
status_line = " ".join([HTTP_VERSION, '202', 'Accepted']).encode() | |
headers_bytes = ("\r\n".join(map(':'.join, list(response_headers.items()))) + '\r\n').encode() | |
return handler(status_line, headers_bytes, content) | |
@parse_path | |
def handle_GET(resource, headers): | |
def handler(status_line, headers_bytes, content_reader): | |
return b"\r\n".join([status_line, headers_bytes, content_reader()]) | |
return build_response(resource, headers, handler) | |
@parse_path | |
def handle_HEAD(resource, headers): | |
def handler(status_line, headers_bytes, content): | |
return b"\r\n".join([status_line, headers_bytes, '']) | |
return build_response(resource, headers, handler) | |
def missing_method(resource, headers): | |
return HTTP_VERSION + " 405 Method Not Allowed\r\n\r\n" | |
methodmap = defaultdict(lambda : missing_method) | |
methodmap.update({'GET': handle_GET, 'HEAD': handle_HEAD}) | |
def handle_client(sock): | |
data = sock.recv(1000) | |
while not data.endswith(b"\r\n\r\n"): | |
newdata = sock.recv(1000) | |
if not newdata: | |
return | |
data += newdata | |
headers = data.strip().splitlines() | |
request, headers = headers[0], headers[1:] | |
headers = dict(map(lambda s: s.split(b':', 1), headers)) | |
try: | |
method, resource, http_version = request.split() | |
except ValueError: | |
response = HTTP_VERSION + " 400 Bad Request\r\n\r\n" | |
else: | |
method = method.decode() | |
if method not in HTTP_METHODS: | |
response = HTTP_VERSION + " 501 Not Implemented\r\n\r\n" | |
else: | |
response = methodmap[method](resource, headers) | |
sock.sendall(response) | |
if __name__ == '__main__': | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.bind(('', int(args.get(1, 8080)))) | |
sock.listen(1000) | |
while True: | |
with closing(sock.accept()[0]) as newsock: | |
handle_client(newsock) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment