Skip to content

Instantly share code, notes, and snippets.

@zeroSteiner
Last active December 10, 2015 22:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save zeroSteiner/4502576 to your computer and use it in GitHub Desktop.
Save zeroSteiner/4502576 to your computer and use it in GitHub Desktop.
A set of classes to make creating HTTP servers with common features such as threading, SSL, basic authentication, 301 redirects, and RPC servers simple.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# AdvancedHTTPServer.py
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of the SecureState Consulting nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Homepage: https://github.com/zeroSteiner/AdvancedHTTPServer
# Author: Spencer McIntyre (zeroSteiner)
# Config File Example
"""
[server]
ip = 0.0.0.0
port = 8080
web_root = /var/www/html
list_directories = True
# Set an ssl_cert to enable SSL
# ssl_cert = /path/to/cert.pem
"""
# The AdvancedHTTPServer systemd service unit file
"""
# Quick How To:
# 1. Copy this file to /etc/systemd/system/pyhttpd.service
# 2. Edit <USER> and run parameters appropriately in the ExecStart option
# 3. Set configuration settings in /etc/pyhttpd.conf
# 4. Run "systemctl daemon-reload"
[Unit]
Description=Python Advanced HTTP Server
After=network.target
[Service]
Type=simple
ExecStart=/sbin/runuser -l <USER> -c "/usr/bin/python -m AdvancedHTTPServer -c /etc/pyhttpd.conf"
ExecStop=/bin/kill -INT $MAINPID
[Install]
WantedBy=multi-user.target
"""
__version__ = '0.3.0'
__all__ = [
'AdvancedHTTPServer',
'AdvancedHTTPServerRegisterPath',
'AdvancedHTTPServerRequestHandler',
'AdvancedHTTPServerRESTAPI',
'AdvancedHTTPServerRPCClient',
'AdvancedHTTPServerRPCError'
]
import BaseHTTPServer
import cgi
import Cookie
import hashlib
import hmac
import httplib
import json
import logging
import logging.handlers
import mimetypes
import os
import posixpath
import re
import shutil
import socket
import SocketServer
import sqlite3
import ssl
import sys
import threading
import traceback
import urllib
import urlparse
import zlib
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
GLOBAL_HANDLER_MAP = {}
SERIALIZER_DRIVERS = {}
SERIALIZER_DRIVERS['application/json'] = {'loads': json.loads, 'dumps': json.dumps}
SERIALIZER_DRIVERS['binary/json'] = {'loads': json.loads, 'dumps': json.dumps}
SERIALIZER_DRIVERS['binary/json+zlib'] = {'loads': lambda d: json.loads(zlib.decompress(d)), 'dumps': lambda d: zlib.compress(json.dumps(d))}
try:
import msgpack
except ImportError:
pass
else:
SERIALIZER_DRIVERS['binary/message-pack'] = {'loads': msgpack.loads, 'dumps': msgpack.dumps}
SERIALIZER_DRIVERS['binary/message-pack+zlib'] = {'loads': lambda d: msgpack.loads(zlib.decompress(d)), 'dumps': lambda d: zlib.compress(msgpack.dumps(d))}
if hasattr(logging, 'NullHandler'):
logging.getLogger('AdvancedHTTPServer').addHandler(logging.NullHandler())
def build_server_from_argparser(description=None, ServerClass=None, HandlerClass=None):
"""
Build a server from command line arguments. If a ServerClass or
HandlerClass is specified, then the object must inherit from the
corresponding AdvancedHTTPServer base class.
:param str description: Description string to be passed to the argument parser.
:param ServerClass: Alternative server class to use.
:type ServerClass: :py:class:`.AdvancedHTTPServer`
:param HandlerClass: Alternative handler class to use.
:type HandlerClass: :py:class:`.AdvancedHTTPServerRequestHandler`
:return: A configured server instance.
:rtype: :py:class:`.AdvancedHTTPServer`
"""
import argparse
import ConfigParser
description = (description or 'AdvancedHTTPServer')
ServerClass = (ServerClass or AdvancedHTTPServer)
HandlerClass = (HandlerClass or AdvancedHTTPServerRequestHandler)
parser = argparse.ArgumentParser(description=description, conflict_handler='resolve')
parser.epilog = 'When a config file is specified with --config the --ip, --port and --web-root options are all ignored.'
parser.add_argument('-w', '--web-root', dest='web_root', action='store', default='.', help='path to the web root directory')
parser.add_argument('-p', '--port', dest='port', action='store', default=8080, type=int, help='port to serve on')
parser.add_argument('-i', '--ip', dest='ip', action='store', default='0.0.0.0', help='the ip address to serve on')
parser.add_argument('--password', dest='password', action='store', default=None, help='password to use for basic authentication')
parser.add_argument('--log-file', dest='log_file', action='store', default=None, help='log information to a file')
parser.add_argument('-c', '--conf', dest='config', action='store', default=None, type=argparse.FileType('r'), help='read settings from a config file')
parser.add_argument('-v', '--version', action='version', version=parser.prog + ' Version: ' + __version__)
parser.add_argument('-L', '--log', dest='loglvl', action='store', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default='INFO', help='set the logging level')
arguments = parser.parse_args()
logging.getLogger('').setLevel(logging.DEBUG)
console_log_handler = logging.StreamHandler()
console_log_handler.setLevel(getattr(logging, arguments.loglvl))
console_log_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)-8s %(message)s"))
logging.getLogger('').addHandler(console_log_handler)
if arguments.log_file:
main_file_handler = logging.handlers.RotatingFileHandler(arguments.log_file, maxBytes=262144, backupCount=5)
main_file_handler.setLevel(logging.DEBUG)
main_file_handler.setFormatter(logging.Formatter("%(asctime)s %(name)-30s %(levelname)-10s %(message)s"))
logging.getLogger('').setLevel(logging.DEBUG)
logging.getLogger('').addHandler(main_file_handler)
if arguments.config:
config = ConfigParser.ConfigParser()
config.readfp(arguments.config)
server = build_server_from_config(config, 'server', ServerClass=ServerClass, HandlerClass=HandlerClass)
else:
server = ServerClass(HandlerClass, address=(arguments.ip, arguments.port))
server.serve_files_root = arguments.web_root
if arguments.password:
server.auth_add_creds('', arguments.password)
return server
def build_server_from_config(config, section_name, ServerClass=None, HandlerClass=None):
"""
Build a server from a provided :py:class:`ConfigParser.ConfigParser`
instance. If a ServerClass or HandlerClass is specified, then the
object must inherit from the corresponding AdvancedHTTPServer base
class.
:param config: Configuration to retrieve settings from.
:type config: :py:class:`ConfigParser.ConfigParser`
:param str section_name: The section name of the configuration to use.
:param ServerClass: Alternative server class to use.
:type ServerClass: :py:class:`.AdvancedHTTPServer`
:param HandlerClass: Alternative handler class to use.
:type HandlerClass: :py:class:`.AdvancedHTTPServerRequestHandler`
:return: A configured server instance.
:rtype: :py:class:`.AdvancedHTTPServer`
"""
ServerClass = (ServerClass or AdvancedHTTPServer)
HandlerClass = (HandlerClass or AdvancedHTTPServerRequestHandler)
port = config.getint(section_name, 'port')
web_root = None
if config.has_option(section_name, 'web_root'):
web_root = config.get(section_name, 'web_root')
if config.has_option(section_name, 'ip'):
ip = config.get(section_name, 'ip')
else:
ip = '0.0.0.0'
ssl_certfile = None
if config.has_option(section_name, 'ssl_cert'):
ssl_certfile = config.get(section_name, 'ssl_cert')
server = ServerClass(HandlerClass, address=(ip, port), ssl_certfile=ssl_certfile)
if config.has_option(section_name, 'password_type'):
password_type = config.get(section_name, 'password_type')
else:
password_type = 'md5'
if config.has_option(section_name, 'password'):
password = config.get(section_name, 'password')
if config.has_option(section_name, 'username'):
username = config.get(section_name, 'username')
else:
username = ''
server.auth_add_creds(username, password, pwtype=password_type)
cred_idx = 0
while config.has_option(section_name, 'password' + str(cred_idx)):
password = config.get(section_name, 'password' + str(cred_idx))
if not config.has_option(section_name, 'username' + str(cred_idx)):
break
username = config.get(section_name, 'username' + str(cred_idx))
server.auth_add_creds(username, password, pwtype=password_type)
cred_idx += 1
if web_root == None:
server.serve_files = False
else:
server.serve_files = True
server.serve_files_root = web_root
if config.has_option(section_name, 'list_directories'):
server.serve_files_list_directories = config.getboolean(section_name, 'list_directories')
return server
class AdvancedHTTPServerRegisterPath(object):
"""
Register a path and handler with the global handler map. This can be
used as a decorator. If no handler is specified then the path and
function will be registered with all :py:class:`.AdvancedHTTPServerRequestHandler`
instances.
.. code-block:: python
@AdvancedHTTPServerRegisterPath('/test')
def handle_test(handler, query):
pass
"""
def __init__(self, path, handler=None):
"""
:param str path: The path regex to register the function to.
:param str handler: A specific :py:class:`.AdvancedHTTPServerRequestHandler` class to register the handler with.
"""
self.path = path
if handler == None or isinstance(handler, (str, unicode)):
self.handler = handler
elif hasattr(handler, '__name__'):
self.handler = handler.__name__
elif hasattr(handler, '__class__'):
self.handler = handler.__class__.__name__
else:
raise ValueError('unknown handler: ' + repr(handler))
def __call__(self, function):
handler_map = GLOBAL_HANDLER_MAP.get(self.handler, {})
handler_map[self.path] = function
GLOBAL_HANDLER_MAP[self.handler] = handler_map
return function
class AdvancedHTTPServerRPCError(Exception):
"""
This class represents an RPC error either local or remote. Any errors
in routines executed on the server will raise this error.
"""
def __init__(self, message, status, remote_exception=None):
self.message = message
self.status = status
self.remote_exception = remote_exception
def __repr__(self):
return "{0}(remote_exception={1})".format(self.__class__.__name__, self.is_remote_exception)
@property
def is_remote_exception(self):
"""
This is true of the represented error resulted from an exception
on the remote server.
"""
return bool(self.remote_exception != None)
class AdvancedHTTPServerRPCClient(object):
"""
This object facilitates communication with remote RPC methods as
provided by a :py:class:`.AdvancedHTTPServerRequestHandler` instance.
Once created this object can be called directly, doing so is the same
as using the call method.
This object uses locks internally to be thread safe. Only one thread
can execute a function at a time.
"""
def __init__(self, address, use_ssl=False, username=None, password=None, uri_base='/', hmac_key=None):
"""
:param tuple address: The address of the server to conenct to as (host, port).
:param bool use_ssl: Whether to connect with SSL or not.
:param str username: The username to authenticate with.
:param str password: The password to authenticate with.
:param str uri_base: An optional prefix for all methods.
:param str hmac_key: An HMAC key to use for request authentication.
"""
self.host = str(address[0])
self.port = int(address[1])
if not hasattr(self, 'logger'):
self.logger = logging.getLogger('AdvancedHTTPServerRPCClient')
self.use_ssl = bool(use_ssl)
self.uri_base = str(uri_base)
self.username = (str(username) if username != None else None)
self.password = (str(password) if password != None else None)
self.hmac_key = (str(hmac_key) if hmac_key != None else None)
self.lock = threading.RLock()
self.serializer_name = SERIALIZER_DRIVERS.keys()[-1]
self.serializer = SERIALIZER_DRIVERS[self.serializer_name]
self.reconnect()
def __reduce__(self):
address = (self.host, self.port)
return (self.__class__, (address, self.use_ssl, self.username, self.password, self.uri_base, self.hmac_key))
def set_serializer(self, serializer_name):
"""
Configure the serializer to use for communication with the server.
The serializer specified must be valid and in the
:py:data:`.SERIALIZER_DRIVERS` map.
:param str serializer_name: The name of the serializer to use.
"""
if not serializer_name in SERIALIZER_DRIVERS:
raise ValueError('unknown serializer: ' + serializer_name)
self.serializer = SERIALIZER_DRIVERS[serializer_name]
self.serializer_name = serializer_name
self.logger.debug('using serializer: ' + serializer_name)
def __call__(self, *args, **kwargs):
return self.call(*args, **kwargs)
def encode(self, data):
"""Encode data with the configured serializer."""
return self.serializer['dumps'](data)
def decode(self, data):
"""Decode data with the configured serializer."""
return self.serializer['loads'](data)
def reconnect(self):
"""Reconnect to the remote server."""
self.lock.acquire()
if self.use_ssl:
self.client = httplib.HTTPSConnection(self.host, self.port)
else:
self.client = httplib.HTTPConnection(self.host, self.port)
self.lock.release()
def call(self, method, *options):
"""
Issue a call to the remote end point to execute the specified
procedure.
:param str method: The name of the remote procedure to execute.
:return: The return value from the remote function.
"""
options = self.encode(options)
headers = {}
headers['Content-Type'] = self.serializer_name
headers['Content-Length'] = str(len(options))
if self.hmac_key != None:
hmac_calculator = hmac.new(self.hmac_key, digestmod=hashlib.sha1)
hmac_calculator.update(options)
headers['HMAC'] = hmac_calculator.hexdigest()
if self.username != None and self.password != None:
headers['Authorization'] = 'Basic ' + (self.username + ':' + self.password).encode('base64').strip()
method = os.path.join(self.uri_base, method)
self.logger.debug('calling RPC method: ' + method[1:])
with self.lock:
self.client.request("RPC", method, options, headers)
resp = self.client.getresponse()
if resp.status != 200:
raise AdvancedHTTPServerRPCError(resp.reason, resp.status)
resp_data = resp.read()
if self.hmac_key != None:
hmac_digest = resp.getheader('hmac')
if not isinstance(hmac_digest, str):
raise AdvancedHTTPServerRPCError('hmac validation error', resp.status)
hmac_digest = hmac_digest.lower()
hmac_calculator = hmac.new(self.hmac_key, digestmod=hashlib.sha1)
hmac_calculator.update(resp_data)
if hmac_digest != hmac_calculator.hexdigest():
raise AdvancedHTTPServerRPCError('hmac validation error', resp.status)
resp_data = self.decode(resp_data)
if not ('exception_occurred' in resp_data and 'result' in resp_data):
raise AdvancedHTTPServerRPCError('missing response information', resp.status)
if resp_data['exception_occurred']:
raise AdvancedHTTPServerRPCError('remote method incured an exception', resp.status, remote_exception=resp_data['exception'])
return resp_data['result']
class AdvancedHTTPServerRPCClientCached(AdvancedHTTPServerRPCClient):
"""
This object builds upon :py:class:`.AdvancedHTTPServerRPCClient` and
provides additional methods for cacheing results in memory.
"""
def __init__(self, *args, **kwargs):
super(AdvancedHTTPServerRPCClientCached, self).__init__(*args, **kwargs)
self.cache_db = sqlite3.connect(':memory:', check_same_thread=False)
cursor = self.cache_db.cursor()
cursor.execute('CREATE TABLE cache (method TEXT NOT NULL, options_hash TEXT NOT NULL, return_value TEXT NOT NULL)')
self.cache_db.commit()
def cache_call(self, method, *options):
"""
Call a remote method and store the result locally. Subsequent
calls to the same method with the same arguments will return the
cached result without invoking the remote procedure.
:param str method: The name of the remote procedure to execute.
:return: The return value from the remote function.
"""
options_hash = hashlib.new('sha1', self.encode(options)).hexdigest()
cursor = self.cache_db.cursor()
cursor.execute('SELECT return_value FROM cache WHERE method = ? AND options_hash = ?', (method, options_hash))
return_value = cursor.fetchone()
if return_value:
return_value = json.loads(return_value[0])
else:
return_value = self.call(method, *options)
cursor.execute('INSERT INTO cache (method, options_hash, return_value) VALUES (?, ?, ?)', (method, options_hash, json.dumps(return_value)))
self.cache_db.commit()
return return_value
def cache_call_refresh(self, method, *options):
"""
Call a remote method and update the local cache with the result
if it already existed.
:param str method: The name of the remote procedure to execute.
:return: The return value from the remote function.
"""
options_hash = hashlib.new('sha1', self.encode(options)).hexdigest()
cursor = self.cache_db.cursor()
cursor.execute('DELETE FROM cache WHERE method = ? AND options_hash = ?', (method, options_hash))
return_value = self.call(method, *options)
cursor.execute('INSERT INTO cache (method, options_hash, return_value) VALUES (?, ?, ?)', (method, options_hash, json.dumps(return_value)))
self.cache_db.commit()
return return_value
def cache_clear(self):
"""Purge the local store of all cached function information."""
cursor = self.cache_db.cursor()
cursor.execute('DELETE FROM cache')
self.cache_db.commit()
self.logger.info('the RPC cache has been clared')
return
class AdvancedHTTPServerNonThreaded(BaseHTTPServer.HTTPServer, object):
"""
This class is used internally by :py:class:`.AdvancedHTTPServer` and
is not intended for use by other classes or functions.
"""
def __init__(self, *args, **kwargs):
if not hasattr(self, 'logger'):
self.logger = logging.getLogger('AdvancedHTTPServer')
self.allow_reuse_address = True
self.using_ssl = False
self.serve_files = False
self.serve_files_root = os.getcwd()
self.serve_files_list_directories = True # irrelevant if serve_files == False
self.serve_robots_txt = True
self.rpc_hmac_key = None
self.basic_auth = None
self.robots_txt = 'User-agent: *\nDisallow: /\n'
self.server_version = 'HTTPServer/' + __version__
super(AdvancedHTTPServerNonThreaded, self).__init__(*args, **kwargs)
def server_bind(self, *args, **kwargs):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
super(AdvancedHTTPServerNonThreaded, self).server_bind(*args, **kwargs)
def shutdown(self, *args, **kwargs):
super(AdvancedHTTPServerNonThreaded, self).shutdown(*args, **kwargs)
self.socket.close()
class AdvancedHTTPServerThreaded(SocketServer.ThreadingMixIn, AdvancedHTTPServerNonThreaded):
"""
This class is used internally by :py:class:`.AdvancedHTTPServer` and
is not intended for use by other classes or functions.
"""
pass
class AdvancedHTTPServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler, object):
"""
This is the primary http request handler class of the
AdvancedHTTPServer framework. Custom request handler must inherit
from this object to be compatible. Instances of this class are created
automatically. This class will handle standard HTTP GET, HEAD, OPTIONS,
and POST requests. Handler functions that are not class methods of
the request handler instance will be passed the instance of the
request handler as the first argument.
"""
if not mimetypes.inited:
mimetypes.init() # try to read system mime.types
extensions_map = mimetypes.types_map.copy()
extensions_map.update({
'': 'application/octet-stream', # Default
'.py': 'text/plain',
'.rb': 'text/plain',
'.c': 'text/plain',
'.h': 'text/plain',
})
def __init__(self, *args, **kwargs):
self.handler_map = {}
self.rpc_handler_map = {}
self.server = args[2]
self.headers_active = False
rest_api_handler = self.server.rest_api_handler
if rest_api_handler:
self.handler_map[rest_api_handler.api_path_regex] = rest_api_handler.dispatch_handler
for map_name in (None, self.__class__.__name__):
handler_map = GLOBAL_HANDLER_MAP.get(map_name, {})
for path, function in handler_map.items():
self.handler_map[path] = function
self.install_handlers()
self.basic_auth_user = None
super(AdvancedHTTPServerRequestHandler, self).__init__(*args, **kwargs)
def version_string(self):
return self.server.server_version
def install_handlers(self):
"""
This method is meant to be over ridden by custom classes. It is
called as part of the __init__ method and provides an opportunity
for the handler maps to be populated with entries.
"""
pass # over ride me
def respond_file(self, file_path, attachment=False, query=None):
"""
Respond to the client by serving a file, either directly or as
an attachment.
:param str file_path: The path to the file to serve, this does not need to be in the web root.
:param bool attachment: Whether to serve the file as a download by setting the Content-Disposition header.
"""
file_path = os.path.abspath(file_path)
try:
file_obj = open(file_path, 'rb')
except IOError:
self.respond_not_found()
return None
self.send_response(200)
self.send_header('Content-Type', self.guess_mime_type(file_path))
fs = os.fstat(file_obj.fileno())
self.send_header('Content-Length', str(fs[6]))
if attachment:
file_name = os.path.basename(file_path)
self.send_header('Content-Disposition', 'attachment; filename=' + file_name)
self.send_header('Last-Modified', self.date_time_string(fs.st_mtime))
self.end_headers()
shutil.copyfileobj(file_obj, self.wfile)
file_obj.close()
return
def respond_list_directory(self, dir_path, query=None):
"""
Respond to the client with an HTML page listing the contents of
the specified directory.
:param str dir_path: The path of the directory to list the contents of.
"""
try:
dir_contents = os.listdir(dir_path)
except os.error:
self.respond_not_found()
return None
if os.path.normpath(dir_path) != self.server.serve_files_root:
dir_contents.append('..')
dir_contents.sort(key=lambda a: a.lower())
f = StringIO()
displaypath = cgi.escape(urllib.unquote(self.path))
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n')
f.write('<html>\n<title>Directory listing for ' + displaypath + '</title>\n')
f.write('<body>\n<h2>Directory listing for ' + displaypath + '</h2>\n')
f.write('<hr>\n<ul>\n')
for name in dir_contents:
fullname = os.path.join(dir_path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + "/"
linkname = name + "/"
if os.path.islink(fullname):
displayname = name + "@"
# Note: a link to a directory displays with @ and links with /
f.write('<li><a href="' + urllib.quote(linkname) + '">' + cgi.escape(displayname) + '</a>\n')
f.write('</ul>\n<hr>\n</body>\n</html>\n')
length = f.tell()
f.seek(0)
self.send_response(200)
encoding = sys.getfilesystemencoding()
self.send_header('Content-Type', 'text/html; charset=' + encoding)
self.send_header('Content-Length', str(length))
self.end_headers()
shutil.copyfileobj(f, self.wfile)
f.close()
return
def respond_not_found(self):
"""Respond to the client with a default 404 message."""
self.send_response(404)
self.send_header('Content-Type', 'text/html')
self.end_headers()
self.wfile.write('Resource Not Found\n')
return
def respond_redirect(self, location='/'):
"""
Respond to the client with a 301 message and redirect them with
a Location header.
:param str location: The new location to redirect the client to.
"""
self.send_response(301)
self.send_header('Location', location)
self.end_headers()
return
def respond_server_error(self, status=None, status_line=None, message=None):
"""
Handle an internal server error, logging a traceback if executed
within an exception handler.
:param int status: The status code to respond to the client with.
:param str status_line: The status message to respond to the client with.
:param str message: The body of the response that is sent to the client.
"""
(ex_type, ex_value, ex_traceback) = sys.exc_info()
if ex_type:
(ex_file_name, ex_line, _, _) = traceback.extract_tb(ex_traceback)[-1]
line_info = "{0}:{1}".format(ex_file_name, ex_line)
log_msg = "encountered {0} in {1}".format(repr(ex_value), line_info)
self.server.logger.error(log_msg)
status = (status or 500)
status_line = (status_line or httplib.responses.get(status, 'Internal Server Error')).strip()
self.send_response(status, status_line)
message = (message or status_line)
if isinstance(message, (str, unicode)):
self.send_header('Content-Length', len(message))
self.end_headers()
self.wfile.write(message)
elif hasattr(message, 'fileno'):
fs = os.fstat(message.fileno())
self.send_header('Content-Length', str(fs[6]))
self.end_headers()
shutil.copyfileobj(message, self.wfile)
else:
self.end_headers()
return
def respond_unauthorized(self, request_authentication=False):
"""
Respond to the client that the request is unauthorized.
:param bool request_authentication: Whether to request basic authentication information by sending a WWW-Authenticate header.
"""
self.send_response(401)
if request_authentication:
self.send_header('WWW-Authenticate', 'Basic realm="' + self.server_version + '"')
self.send_header('Content-Type', 'text/html')
self.end_headers()
self.wfile.write('Unauthorized\n')
return
def dispatch_handler(self, query=None):
"""
Dispatch functions based on the established handler_map. It is
generally not necessary to override this function and doing so
will prevent any handlers from being executed. This function is
executed automatically when requests of either GET, HEAD, or POST
are received.
:param dict query: Parsed query parameters from the corresponding request.
"""
query = (query or {})
# normalize the path
# abandon query parameters
self.path = self.path.split('?', 1)[0]
self.path = self.path.split('#', 1)[0]
self.original_path = urllib.unquote(self.path)
self.path = posixpath.normpath(self.original_path)
words = self.path.split('/')
words = filter(None, words)
tmp_path = ''
for word in words:
drive, word = os.path.splitdrive(word)
head, word = os.path.split(word)
if word in (os.curdir, os.pardir):
continue
tmp_path = os.path.join(tmp_path, word)
self.path = tmp_path
if self.path == 'robots.txt' and self.server.serve_robots_txt:
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(self.server.robots_txt)
return
self.cookies = Cookie.SimpleCookie(self.headers.get('cookie', ''))
for (path_regex, handler) in self.handler_map.items():
if re.match(path_regex, self.path):
try:
if hasattr(self, handler.__name__) and (handler == getattr(self, handler.__name__).__func__ or handler == getattr(self, handler.__name__)):
getattr(self, handler.__name__)(query)
else:
handler(self, query)
except:
self.respond_server_error()
return
if not self.server.serve_files:
self.respond_not_found()
return
file_path = self.server.serve_files_root
file_path = os.path.join(file_path, tmp_path)
if os.path.isfile(file_path) and os.access(file_path, os.R_OK):
self.respond_file(file_path, query=query)
return
elif os.path.isdir(file_path) and os.access(file_path, os.R_OK):
if not self.original_path.endswith('/'):
# redirect browser, doing what apache does
destination = self.path + '/'
if self.command == 'GET':
destination += '?' + urllib.urlencode(self.query_data, True)
self.respond_redirect(destination)
return
for index in ['index.html', 'index.htm']:
index = os.path.join(file_path, index)
if os.path.isfile(index) and os.access(index, os.R_OK):
self.respond_file(index, query=query)
return
if self.server.serve_files_list_directories:
self.respond_list_directory(file_path, query=query)
return
self.respond_not_found()
return
def send_response(self, *args, **kwargs):
super(AdvancedHTTPServerRequestHandler, self).send_response(*args, **kwargs)
self.headers_active = True
def end_headers(self):
super(AdvancedHTTPServerRequestHandler, self).end_headers()
self.headers_active = False
if self.command == 'HEAD':
self.wfile.close()
self.wfile = open(os.devnull, 'wb')
def guess_mime_type(self, path):
"""
Guess an appropriate MIME type based on the extension of the
provided path.
:param str path: The of the file to analyze.
:return: The guessed MIME type of the default if non are found.
:rtype: str
"""
base, ext = posixpath.splitext(path)
if ext in self.extensions_map:
return self.extensions_map[ext]
ext = ext.lower()
if ext in self.extensions_map:
return self.extensions_map[ext]
else:
return self.extensions_map['']
def stock_handler_respond_unauthorized(self, query):
"""This method provides a handler suitable to be used in the handler_map."""
self.respond_unauthorized()
return
def stock_handler_respond_not_found(self, query):
"""This method provides a handler suitable to be used in the handler_map."""
self.respond_not_found()
return
def check_authorization(self):
"""
Check for the presence of a basic auth Authorization header and
if the credentials contained within are valid.
"""
try:
if self.server.basic_auth == None:
return True
auth_info = self.headers.getheader('Authorization')
if not auth_info:
return False
auth_info = auth_info.split()
if len(auth_info) != 2:
return False
if auth_info[0] != 'Basic':
return False
auth_info = auth_info[1].decode('base64')
username = auth_info.split(':')[0]
password = ':'.join(auth_info.split(':')[1:])
if hasattr(self, 'custom_authentication'):
if self.custom_authentication(username, password):
self.basic_auth_user = username
return True
return False
if not username in self.server.basic_auth:
self.server.logger.warning('received invalid username: ' + username)
return False
password_data = self.server.basic_auth[username]
if password_data['type'] == 'plain':
if password == password_data['value']:
self.basic_auth_user = username
return True
elif password_data['type'] == 'md5':
if hashlib.new('md5', password).hexdigest() == password_data['value']:
self.basic_auth_user = username
return True
elif password_data['type'] == 'sha1':
if hashlib.new('sha1', password).hexdigest() == password_data['value']:
self.basic_auth_user = username
return True
self.server.logger.warning('received invalid password from user: ' + username)
return False
except:
return False
def cookie_get(self, name):
"""
Check for a cookie value by name.
:param str name: Name of the cookie value to retreive.
:return: Returns the cookie value if it's set or None if it's not found.
"""
if not hasattr(self, 'cookies'):
return None
if self.cookies.get(name):
return self.cookies.get(name).value
return None
def cookie_set(self, name, value):
"""
Set the value of a client cookie. This can only be called while
headers can be sent.
:param str name: The name of the cookie value to set.
:param str value: The value of the cookie to set.
"""
if not self.headers_active:
raise RuntimeError('headers have already been ended')
cookie = "{0}={1}; Path=/; HttpOnly".format(name, value)
self.send_header('Set-Cookie', cookie)
def do_GET(self):
if not self.check_authorization():
self.respond_unauthorized(request_authentication=True)
return
uri = urlparse.urlparse(self.path)
self.path = uri.path
self.query_data = urlparse.parse_qs(uri.query)
self.dispatch_handler(self.query_data)
return
def do_HEAD(self):
self.do_GET()
def do_POST(self):
if not self.check_authorization():
self.respond_unauthorized(request_authentication=True)
return
content_length = int(self.headers.getheader('content-length') or 0)
data = self.rfile.read(content_length)
self.query_data_raw = data
content_type = self.headers.getheader('content-type') or ''
content_type = content_type.split(';', 1)[0]
self.query_data = {}
try:
if content_type.startswith('application/json'):
data = json.loads(data)
if isinstance(data, dict):
self.query_data = dict(map(lambda i: (i[0], [i[1]]), data.items()))
else:
self.query_data = urlparse.parse_qs(data, keep_blank_values=1)
except:
self.respond_server_error(400)
else:
self.dispatch_handler(self.query_data)
return
def do_OPTIONS(self):
available_methods = map(lambda x: x[3:], filter(lambda x: x.startswith('do_'), dir(self)))
if 'RPC' in available_methods and len(self.rpc_handler_map) == 0:
available_methods.remove('RPC')
self.send_response(200)
self.send_header('Allow', ', '.join(available_methods))
self.end_headers()
def do_RPC(self):
if not self.check_authorization():
self.respond_unauthorized(request_authentication=True)
return
data_length = self.headers.getheader('content-length')
if self.headers.getheader('content-length') == None:
self.send_error(411)
return
data_type = self.headers.getheader('content-type')
if data_type == None:
self.send_error(400, 'Missing Header: Content-Type')
return
if not data_type in SERIALIZER_DRIVERS:
self.send_error(400, 'Invalid Content-Type')
return
serializer = SERIALIZER_DRIVERS[data_type]
try:
data_length = int(self.headers.getheader('content-length'))
data = self.rfile.read(data_length)
except:
self.send_error(400, 'Invalid Data')
return
if self.server.rpc_hmac_key != None:
hmac_digest = self.headers.getheader('hmac')
if not isinstance(hmac_digest, str):
self.respond_unauthorized(request_authentication=True)
return
hmac_digest = hmac_digest.lower()
hmac_calculator = hmac.new(self.server.rpc_hmac_key, digestmod=hashlib.sha1)
hmac_calculator.update(data)
if hmac_digest != hmac_calculator.hexdigest():
self.server.logger.warning('failed to validate HMAC digest')
self.respond_unauthorized(request_authentication=True)
return
try:
data = serializer['loads'](data)
if type(data) == list:
data = tuple(data)
assert(type(data) == tuple)
except:
self.server.logger.warning('serializer failed to load data')
self.send_error(400, 'Invalid Data')
return
rpc_handler = None
for (path_regex, handler) in self.rpc_handler_map.items():
if re.match(path_regex, self.path):
rpc_handler = handler
break
if not rpc_handler:
self.respond_server_error(501)
return
self.server.logger.info('running RPC method: ' + self.path)
response = {'result': None, 'exception_occurred': False}
try:
result = rpc_handler(*data)
response['result'] = result
except Exception as error:
response['exception_occurred'] = True
exc = {}
exc['name'] = error.__class__.__name__
exc['message'] = error.message
response['exception'] = exc
self.server.logger.error('error: ' + error.__class__.__name__ + ' occurred while calling RPC method: ' + self.path)
try:
response = serializer['dumps'](response)
except:
self.respond_server_error(message='Failed To Pack Response')
return
self.send_response(200)
self.send_header('Content-Type', data_type)
if self.server.rpc_hmac_key != None:
hmac_calculator = hmac.new(self.server.rpc_hmac_key, digestmod=hashlib.sha1)
hmac_calculator.update(response)
self.send_header('HMAC', hmac_calculator.hexdigest())
self.end_headers()
self.wfile.write(response)
return
def log_error(self, format, *args):
self.server.logger.warning(self.address_string() + ' ' + format % args)
def log_message(self, format, *args):
self.server.logger.info(self.address_string() + ' ' + format % args)
class AdvancedHTTPServerRESTAPI(object):
"""
This is a manager REST request handlers. It allows them to be grouped
together and to use a common base path, '/api/' by default. Handler
functions that are not class methods will be passed the instance of
the managing class as the first argument.
"""
def __init__(self, api_path='/api/'):
"""
:param str api_path: A base path to be prefixed to all handlers.
"""
self.handler_map = {}
map_name = self.__class__.__name__
handler_map = GLOBAL_HANDLER_MAP.get(map_name, {})
for path, function in handler_map.items():
self.handler_map[path] = function
self.install_handlers()
@property
def api_path_regex(self):
return '^' + self.api_path.strip('/') + '/\S'
def install_handlers(self):
"""
This method is meant to be over ridden by custom classes. It is
called as part of the __init__ method and provides an opportunity
for the handler maps to be populated with entries.
"""
pass # over ride me
def dispatch_handler(self, request_handler, query):
"""
Dispatch functions based on the established handler_map. It is
generally not necessary to override this function and doing so
will prevent any handlers from being executed. This function is
executed automatically when requests are received that being with
the specified api base path.
:param dict query: Parsed query parameters from the corresponding request.
"""
path = request_handler.path
prefix_len = len(re.match(self.api_path_regex, path).group(0)) - 1
path = path[prefix_len:]
handler_found = False
result = None
arguments = []
if request_handler.command == 'GET':
arguments = dict(map(lambda i: (i[0], i[1][-1]), query.items()))
if request_handler.command == 'POST':
arguments = json.loads(request_handler.query_data_raw)
if not isinstance(arguments, (dict, list, tuple)):
arguments = [arguments]
for (path_regex, handler) in self.handler_map.items():
if re.match(path_regex, path):
handler_found = True
if hasattr(self, handler.__name__) and (handler == getattr(self, handler.__name__).__func__ or handler == getattr(self, handler.__name__)):
if isinstance(arguments, dict):
result = getattr(self, handler.__name__)(**arguments)
else:
result = getattr(self, handler.__name__)(*arguments)
else:
if isinstance(arguments, dict):
result = handler(self, **arguments)
else:
result = handler(self, *arguments)
break
if not handler_found:
request_handler.respond_server_error(501)
return
result = json.dumps(result) + '\n'
request_handler.send_response(200)
request_handler.send_header('Content-Type', 'application/json')
request_handler.send_header('Content-Length', len(result))
request_handler.end_headers()
request_handler.wfile.write(result)
class AdvancedHTTPServer(object):
"""
This is the primary server class for the AdvancedHTTPServer framework.
Custom servers must inherit from this object to be compatible. When
no *address* parameter is specified the address '0.0.0.0' is used and
the port is guessed based on if the server is run as root or not and
SSL is used.
"""
def __init__(self, RequestHandler, address=None, use_threads=True, ssl_certfile=None):
"""
:param RequestHandler: The request handler class to use.
:type RequestHandler: :py:class:`.AdvancedHTTPServerRequestHandler`
:param tuple address: The address to bind to in the format (host, port).
:param bool use_threads: Whether to enable the use of a threaded handler.
:param str ssl_certfile: A SSL certificate file to use, setting this enables SSL.
"""
self.use_ssl = bool(ssl_certfile)
if address == None:
if self.use_ssl:
if os.getuid():
address = ('0.0.0.0', 8443)
else:
address = ('0.0.0.0', 443)
else:
if os.getuid():
address = ('0.0.0.0', 8080)
else:
address = ('0.0.0.0', 80)
self.address = address
self.ssl_certfile = ssl_certfile
if not hasattr(self, 'logger'):
self.logger = logging.getLogger('AdvancedHTTPServer')
self.server_started = False
if use_threads:
self.http_server = AdvancedHTTPServerThreaded(address, RequestHandler)
else:
self.http_server = AdvancedHTTPServerNonThreaded(address, RequestHandler)
self.logger.info('listening on ' + address[0] + ':' + str(address[1]))
self.http_server.rest_api_handler = None
if self.use_ssl:
self.http_server.socket = ssl.wrap_socket(self.http_server.socket, certfile=ssl_certfile, server_side=True)
self.http_server.using_ssl = True
self.logger.info(address[0] + ':' + str(address[1]) + ' - ssl has been enabled')
if hasattr(RequestHandler, 'custom_authentication'):
self.logger.debug(address[0] + ':' + str(address[1]) + ' - a custom authentication function is being used')
self.auth_set(True)
def init_rest_api(self, rest_api_handler):
"""
Initialize a REST API Handler.
:param rest_api_handler: The handler instance to register with the server.
:type rest_api_handler: :py:class:`.AdvancedHTTPServerRESTAPI`
"""
if not isinstance(rest_api_handler, AdvancedHTTPServerRESTAPI):
raise ValueError('rest_api_handler must be an instance of AdvancedHTTPServerRESTAPI')
self.http_server.rest_api_handler = rest_api_handler
self.logger.debug(self.address[0] + ':' + str(self.address[1]) + ' - a REST API handler has been registered')
def serve_forever(self, fork=False):
"""
Start handling requests. This method must be called and does not
return unless the :py:func:`.shutdown` method is called from
another thread.
:param bool fork: Whether to fork or not before serving content.
"""
if fork:
if not hasattr(os, 'fork'):
raise OSError('os.fork is not available')
child_pid = os.fork()
if child_pid != 0:
self.logger.info(self.address[0] + ':' + str(self.address[1]) + ' - forked child process: ' + str(child_pid))
return child_pid
self.server_started = True
self.http_server.serve_forever()
return 0
def shutdown(self):
"""Shutdown the server and stop responding to requests."""
if self.server_started:
self.http_server.shutdown()
@property
def serve_files(self):
"""
Whether to enable serving files or not.
:type: bool
"""
return self.http_server.serve_files
@serve_files.setter
def serve_files(self, value):
value = bool(value)
if self.http_server.serve_files == value:
return
self.http_server.serve_files = value
if value:
self.logger.info(self.address[0] + ':' + str(self.address[1]) + ' - serving files has been enabled')
else:
self.logger.info(self.address[0] + ':' + str(self.address[1]) + ' - serving files has been disabled')
@property
def serve_files_root(self):
"""
The web root to use when serving files.
:type: str
"""
return self.http_server.serve_files_root
@serve_files_root.setter
def serve_files_root(self, value):
self.http_server.serve_files_root = os.path.abspath(value)
@property
def serve_files_list_directories(self):
"""
Whether to list the contents of directories. This is only honored
when :py:attr:`.serve_files` is True.
:type: bool
"""
return self.http_server.serve_files_list_directories
@serve_files_list_directories.setter
def serve_files_list_directories(self, value):
self.http_server.serve_files_list_directories = bool(value)
@property
def serve_robots_txt(self):
"""
Whether to serve a default robots.txt file which denies everything.
:type: bool
"""
return self.http_server.serve_robots_txt
@serve_robots_txt.setter
def serve_robots_txt(self, value):
self.http_server.serve_robots_txt = bool(value)
@property
def rpc_hmac_key(self):
"""
An HMAC key to be used for authenticating RPC requests.
:type: str
"""
return self.http_server.rpc_hmac_key
@rpc_hmac_key.setter
def rpc_hmac_key(self, value):
self.http_server.rpc_hmac_key = str(value)
@property
def server_version(self):
"""
The server version to be sent to clients in headers.
:type: str
"""
return self.http_server.server_version
@server_version.setter
def server_version(self, value):
self.http_server.server_version = str(value)
def auth_set(self, status):
"""
Enable or disable requring authentication on all incoming requests.
:param bool status: Whether to enable or disable requiring authentication.
"""
if not bool(status):
self.http_server.basic_auth = None
self.logger.info(self.address[0] + ':' + str(self.address[1]) + ' - basic authentication has been disabled')
else:
self.http_server.basic_auth = {}
self.logger.info(self.address[0] + ':' + str(self.address[1]) + ' - basic authentication has been enabled')
def auth_delete_creds(self, username=None):
"""
Delete the credentials for a specific username if specified or all
stored credentials.
:param str username: The username of the credentials to delete.
"""
if not username:
self.http_server.basic_auth = {}
self.logger.info(self.address[0] + ':' + str(self.address[1]) + ' - basic authentication database has been cleared of all entries')
return
del self.http_server.basic_auth[username]
def auth_add_creds(self, username, password, pwtype='plain'):
"""
Add a valid set of credentials to be accepted for authentication.
Calling this function will automatically enable requiring
authentication. Passwords can be provided in either plaintext or
as a hash by specifying the hash type in the *pwtype* argument.
:param str username: The username of the credentials to be added.
:param str password: The password data of the credentials to be added.
:param str pwtype: The type of the *password* data, (plain, md5 or sha1).
"""
pwtype = pwtype.lower()
if not pwtype in ('plain', 'md5', 'sha1'):
raise ValueError('invalid password type, must be (\'plain\', \'md5\', \'sha1\')')
if self.http_server.basic_auth == None:
self.http_server.basic_auth = {}
self.logger.info(self.address[0] + ':' + str(self.address[1]) + ' - basic authentication has been enabled')
if pwtype != 'plain':
password = password.lower()
self.http_server.basic_auth[username] = {'value': password, 'type': pwtype}
def main():
try:
server = build_server_from_argparser()
except ImportError:
server = AdvancedHTTPServer(AdvancedHTTPServerRequestHandler)
server.serve_files_root = '.'
server.serve_files_root = (server.serve_files_root or '.')
server.serve_files = True
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.shutdown()
logging.shutdown()
return 0
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment