Created
May 19, 2015 18:14
-
-
Save amirouche/6a2959fcb5995e47a6f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import mimetypes | |
from asyncio import coroutine | |
from asyncio import get_event_loop | |
from aiohttp.web import Application | |
from aiohttp.web import StaticRoute | |
from aiohttp.web import HTTPNotFound | |
from aiohttp.web import HTTPMovedPermanently | |
from aiohttp.web import StreamResponse | |
class StaticRouteWithIndex(StaticRoute): | |
limit = 8192 | |
@coroutine | |
def handle(self, request): | |
resp = StreamResponse() | |
filename = request.match_info['filename'] | |
filepath = os.path.join(self._directory, filename) | |
if '..' in filename: | |
print('not found %s' % filepath) | |
raise HTTPNotFound() | |
if not os.path.exists(filepath): | |
print('not found %s' % filepath) | |
raise HTTPNotFound() | |
if not os.path.isfile(filepath): | |
directory = filepath | |
filename = None | |
if not filepath.endswith('/'): | |
path = filepath + '/' | |
path = path[len(self._directory):] | |
raise HTTPMovedPermanently(path) | |
for index in ('index.html', 'index.htm'): | |
path = os.path.join(directory, index) | |
if os.path.exists(path): | |
filename = index | |
filepath = path | |
break | |
if not filename and os.path.isdir(filepath): | |
if not filepath.endswith('/'): | |
filepath += '/' | |
names = os.listdir(filepath) | |
names.sort() | |
output = '<ul>' | |
for name in names: | |
dirname = os.path.join(filepath, name) | |
if os.path.isdir(dirname): | |
dirname += '/' | |
path = dirname[len(self._directory):] | |
link = '<a href="%s">%s</a>' % (path, name) | |
output += '<li>' + link + '</li>' | |
output += '</ul>' | |
resp.content_type = 'text/html' | |
resp.start(request) | |
resp.write(output.encode('utf-8')) | |
return resp | |
elif not filename: | |
print('not found %s' % filepath) | |
raise HTTPNotFound() | |
else: | |
ct = mimetypes.guess_type(filename)[0] | |
if not ct: | |
ct = 'application/octet-stream' | |
resp.content_type = ct | |
file_size = os.stat(filepath).st_size | |
single_chunk = file_size < self.limit | |
if single_chunk: | |
resp.content_length = file_size | |
resp.start(request) | |
with open(filepath, 'rb') as f: | |
chunk = f.read(self.limit) | |
if single_chunk: | |
resp.write(chunk) | |
else: | |
while chunk: | |
resp.write(chunk) | |
chunk = f.read(self.limit) | |
print('ok %s' % filepath) | |
return resp | |
@coroutine | |
def init(loop): | |
path = os.path.abspath(os.curdir) | |
app = Application(loop=loop) | |
route = StaticRouteWithIndex(None, '/', path) | |
app.router._register_endpoint(route) | |
handler = app.make_handler() | |
srv = yield from loop.create_server(handler, '0.0.0.0', 8001) | |
return srv, handler | |
loop = get_event_loop() | |
srv, handler = loop.run_until_complete(init(loop)) | |
print("Server started at http://127.0.0.1:8001") | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
loop.run_until_complete(handler.finish_connections()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment