Skip to content

Instantly share code, notes, and snippets.

@flying-sheep
Created January 23, 2014 18:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save flying-sheep/8583767 to your computer and use it in GitHub Desktop.
Save flying-sheep/8583767 to your computer and use it in GitHub Desktop.
Extension of the Autobahn|Python WebSocket server for HTTP
#!/usr/bin/env python3
#http://hg.python.org/cpython/file/default/Lib/test/test_asyncio
import sys
import mimetypes
from os import listdir, path as p
from argparse import ArgumentParser
from email.utils import formatdate
from functools import partial
from asyncio import get_event_loop, coroutine, async, CancelledError
from aiohttp import Response
from autobahn.websocket.http import HttpException
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
ARGS = ArgumentParser(description='Run simple http server.')
ARGS.add_argument('--host', action='store', default='127.0.0.1', help='Host name')
ARGS.add_argument('--port', action='store', default=8080, type=int, help='Port number')
R_CMD = ['Rscript', 'slave.R']
INDEX_FILES = ['index.htm', 'index.html']
HTTP_STATUS = 'HTTP/1.1 {status[0]} {status[1]}'
HTTP_ERROR = '''<!doctype html>
<html>
<head>
<title>Error – {e.code}</title>
</head>
<body>
<h1>Error – {e.code}</h1>
{e.reason}
</body>
</html>'''
DIR_LIST = '''<!doctype html>
<html>
<head>
<title>{title}</title>
</head>
<body>
<ul>
{entries}
</ul>
</body>
</html>'''
DIR_ENTRY = '<li><a href="{name}">{name}</a></li>'
class HTTPServerProtocol(WebSocketServerProtocol):
def sendServerStatus(self):
try:
self.serve_http()
except HttpException as e:
response = Response(self.transport, e.code)
response.add_header('Content-type', 'text/html; charset=utf-8')
response.send_headers()
response.write(HTTP_ERROR.format(e=e).encode())
def serve_http(self):
method, path, version = self.http_status_line.split()
print('method = {!r}; path = {!r}; version = {!r}'
.format(method, path, version))
path_invalid = False
if (not (path.isprintable() and path.startswith('/')) or '/.' in path):
print('bad path', repr(path))
path_invalid = True
else:
path = '.' + path
if not p.exists(path):
print('no file', repr(path))
path_invalid = True
else:
isdir = p.isdir(path)
if isdir:
indices = [p.join(path, f) for f in INDEX_FILES]
index = [p_ for p_ in indices if p.exists(p_)]
if index:
path = index[0]
isdir = False
if path_invalid:
raise HttpException(404, 'path not found: {}'.format(path[1:]))
if isdir and path != './' and path.endswith('/'):
path = path[:-1]
print('redirect:', path)
raise HttpException(302, headers=[('URI', path), ('Location', path)])
response = Response(self.transport, 200)
response.add_header('Transfer-Encoding', 'chunked')
accept_encoding = self.http_headers.get('accept-encoding', '')
if 'deflate' in accept_encoding:
response.add_header('Content-Encoding', 'deflate')
response.add_compression_filter('deflate')
elif 'gzip' in accept_encoding:
response.add_header('Content-Encoding', 'gzip')
response.add_compression_filter('gzip')
response.add_chunking_filter(1025)
response.add_header('Last-Modified', formatdate(p.getmtime(path)))
if isdir:
response.add_header('Content-type', 'text/html; charset=utf-8')
response.send_headers()
response.write(b'<!doctype html>\n')
response.write(b'<ul>\n')
for name in sorted(listdir(path)):
if name.isprintable() and not name.startswith('.'):
isdir = '/' if p.isdir(p.join(path, name)) else ''
response.write('<li><a href="{0}">{0}{1}</a></li>\n'.format(name, isdir).encode())
response.write(b'</ul>')
else:
mime, _ = mimetypes.guess_type(path)
response.add_header('Content-type', mime or 'text/plain; charset=utf-8')
response.send_headers()
try:
with open(path, 'rb') as fp:
chunk = fp.read(8196)
while chunk:
response.write(chunk)
chunk = fp.read(8196)
except OSError:
raise HttpException(302, 'Could not open file {}'.format(path))
response.write_eof()
self.wasClean = True
self.remoteCloseReason = 'normal HTTP sent'
self.dropConnection(abort=False)
if __name__ == '__main__':
args = ARGS.parse_args()
if ':' in args.host:
args.host, port = args.host.split(':', 1)
args.port = int(port)
factory = WebSocketServerFactory('ws://{0.host}:{0.port}'.format(args), debug=False)
factory.protocol = HTTPServerProtocol
loop = get_event_loop()
coro = loop.create_server(factory, args.host, args.port)
server = loop.run_until_complete(coro)
print('serving on', '{0.host}:{0.port}'.format(args))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment