Last active
January 21, 2019 20:14
-
-
Save lemon24/2dcb9c89ebe9c268f29572b90f53adf4 to your computer and use it in GitHub Desktop.
Run an HTTP(S) server with various customizations using only stuff from the Python stdlib.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Run an HTTP(S) server with various customizations using only stuff from stdlib. | |
See make_httpd() and run_httpd() for details. | |
""" | |
import http.server | |
import contextlib | |
import threading | |
import gzip | |
import ssl | |
import io | |
class GzipHandlerMixin: | |
def send_head(self): | |
self.end_headers = lambda: None | |
original_send_header = self.send_header | |
def send_header(keyword, value): | |
if keyword == 'Content-Length': | |
return | |
original_send_header(keyword, value) | |
self.send_header = send_header | |
f = super().send_head() | |
del self.end_headers | |
del self.send_header | |
if f: | |
try: | |
data = f.read() | |
finally: | |
f.close() | |
compressed_file = io.BytesIO() | |
gz = gzip.GzipFile(fileobj=compressed_file, mode='wb') | |
gz.write(data) | |
gz.close() | |
self.send_header('Content-Encoding', 'gzip') | |
self.send_header('Content-Length', str(len(compressed_file.getvalue()))) | |
self.end_headers() | |
compressed_file.seek(0) | |
return compressed_file | |
return f | |
class LastModifiedHandlerMixin: | |
@property | |
def last_modified(self): | |
raise NotImplementedError | |
def date_time_string(self, timestamp=None): | |
return self.last_modified | |
class ErrorCodeHandlerMixin: | |
@property | |
def error_code(self): | |
raise NotImplementedError | |
def do_GET(self): | |
self.send_error(self.error_code) | |
class RequestHeadersHandlerMixin: | |
def send_response(self, *args, **kwargs): | |
self.server.request_headers = self.headers | |
return super().send_response(*args, **kwargs) | |
class ProtocolServerMixin: | |
protocol = 'http' | |
@property | |
def url(self): | |
return "{s.protocol}://{s.server_address[0]}:{s.server_address[1]}/".format(s=self) | |
class HTTPSServerMixin: | |
protocol = 'https' | |
def __init__(self, *args, **kwargs): | |
certfile = kwargs.pop('certfile') | |
super().__init__(*args, **kwargs) | |
self.socket = ssl.wrap_socket( | |
self.socket, certfile=certfile, server_side=True) | |
def make_httpd(server_address=('127.0.0.1', 0), | |
gzip=False, last_modified=None, error_code=None, | |
request_headers=False, certfile=None): | |
"""Return a configured http.server.HTTPServer subclass instance | |
that will serve files from the current directory using a subclass | |
of http.server.SimpleHTTPRequestHandler. | |
The server URL will be available as server.url. | |
Args: | |
server_address (tuple(str, int)): The address to bind to and listen on. | |
gzip (bool): If true, compress the response body using gzip. | |
last_modified (str): Set the Last-Modified header to this value. | |
error_code (int): Set the status code of all GET responses to this value. | |
request_headers (bool): If true, make the request headers available | |
as server.request_headers on the returned server object after | |
each request. | |
certfile (str): Make the server use TLS and use this file as certificate; | |
should be the path to a certificate. | |
Returns: | |
http.server.HTTPServer: A configured server instance. | |
""" | |
if gzip and error_code: | |
raise ValueError("gzip doesn't work with error_code") | |
handler_bases = [http.server.SimpleHTTPRequestHandler] | |
handler_attrs = {} | |
server_bases = [ProtocolServerMixin, http.server.HTTPServer] | |
server_kwargs = {} | |
if gzip: | |
handler_bases.insert(0, GzipHandlerMixin) | |
if last_modified: | |
handler_bases.insert(0, LastModifiedHandlerMixin) | |
handler_attrs['last_modified'] = last_modified | |
if error_code: | |
handler_bases.insert(0, ErrorCodeHandlerMixin) | |
handler_attrs['error_code'] = error_code | |
if request_headers: | |
handler_bases.insert(0, RequestHeadersHandlerMixin) | |
if certfile: | |
server_bases.insert(0, HTTPSServerMixin) | |
server_kwargs['certfile'] = certfile | |
Handler = type('Handler', tuple(handler_bases), handler_attrs) | |
Server = type('Server', tuple(server_bases), {}) | |
return Server(server_address, Handler, **server_kwargs) | |
@contextlib.contextmanager | |
def run_httpd(**kwargs): | |
"""Run an HTTP server as returned by make_httpd() in a background thread. | |
""" | |
httpd = make_httpd(**kwargs) | |
threading.Thread(target=httpd.serve_forever).start() | |
try: | |
yield httpd | |
finally: | |
httpd.shutdown() | |
if __name__ == '__main__': # pragma: no cover | |
# Run an HTTPS server in the background that compresses the response body | |
# and returns a custom Last-Modified header; make a request, then print | |
# the request headers as received by the server. | |
# | |
# To generate a certificate: | |
# | |
# openssl req -new -x509 -keyout server.pem -out server.pem -days 365 -nodes | |
# | |
# To run the server: | |
# | |
# python run_httpd.py | |
import subprocess | |
kwargs = dict(gzip=True, last_modified='last-modified', | |
request_headers=True, certfile='server.pem') | |
with run_httpd(**kwargs) as httpd: | |
print("\n--- running at", httpd.url) | |
out = subprocess.run( | |
['curl', '-k', '-s', '-D-', '-o/dev/null', httpd.url], | |
stdout=subprocess.PIPE, universal_newlines=True, | |
).stdout | |
print("\n--- curl output", out.strip(), sep='\n') | |
headers = getattr(httpd, 'request_headers', None) | |
if headers: | |
print("\n--- headers", headers, sep='\n') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment