Skip to content

Instantly share code, notes, and snippets.

@amirouche
Created May 19, 2015 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amirouche/6a2959fcb5995e47a6f2 to your computer and use it in GitHub Desktop.
Save amirouche/6a2959fcb5995e47a6f2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import os
import mimetypes
from asyncio import coroutine
from asyncio import get_event_loop
from aiohttp.web import Application
from aiohttp.web import StaticRoute
from aiohttp.web import HTTPNotFound
from aiohttp.web import HTTPMovedPermanently
from aiohttp.web import StreamResponse
class StaticRouteWithIndex(StaticRoute):
limit = 8192
@coroutine
def handle(self, request):
resp = StreamResponse()
filename = request.match_info['filename']
filepath = os.path.join(self._directory, filename)
if '..' in filename:
print('not found %s' % filepath)
raise HTTPNotFound()
if not os.path.exists(filepath):
print('not found %s' % filepath)
raise HTTPNotFound()
if not os.path.isfile(filepath):
directory = filepath
filename = None
if not filepath.endswith('/'):
path = filepath + '/'
path = path[len(self._directory):]
raise HTTPMovedPermanently(path)
for index in ('index.html', 'index.htm'):
path = os.path.join(directory, index)
if os.path.exists(path):
filename = index
filepath = path
break
if not filename and os.path.isdir(filepath):
if not filepath.endswith('/'):
filepath += '/'
names = os.listdir(filepath)
names.sort()
output = '<ul>'
for name in names:
dirname = os.path.join(filepath, name)
if os.path.isdir(dirname):
dirname += '/'
path = dirname[len(self._directory):]
link = '<a href="%s">%s</a>' % (path, name)
output += '<li>' + link + '</li>'
output += '</ul>'
resp.content_type = 'text/html'
resp.start(request)
resp.write(output.encode('utf-8'))
return resp
elif not filename:
print('not found %s' % filepath)
raise HTTPNotFound()
else:
ct = mimetypes.guess_type(filename)[0]
if not ct:
ct = 'application/octet-stream'
resp.content_type = ct
file_size = os.stat(filepath).st_size
single_chunk = file_size < self.limit
if single_chunk:
resp.content_length = file_size
resp.start(request)
with open(filepath, 'rb') as f:
chunk = f.read(self.limit)
if single_chunk:
resp.write(chunk)
else:
while chunk:
resp.write(chunk)
chunk = f.read(self.limit)
print('ok %s' % filepath)
return resp
@coroutine
def init(loop):
path = os.path.abspath(os.curdir)
app = Application(loop=loop)
route = StaticRouteWithIndex(None, '/', path)
app.router._register_endpoint(route)
handler = app.make_handler()
srv = yield from loop.create_server(handler, '0.0.0.0', 8001)
return srv, handler
loop = get_event_loop()
srv, handler = loop.run_until_complete(init(loop))
print("Server started at http://127.0.0.1:8001")
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(handler.finish_connections())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment