Skip to content

Instantly share code, notes, and snippets.

@SteffenDE
Last active March 23, 2016 18:19
Show Gist options
  • Save SteffenDE/f1d19190cdaecb93e4a6 to your computer and use it in GitHub Desktop.
Save SteffenDE/f1d19190cdaecb93e4a6 to your computer and use it in GitHub Desktop.
import asyncio
import os
import os.path
import mimetypes
import re
from aiohttp import web, hdrs
from staticresponse import StaticResponse
async def send_file_partial(path, request, chunksize=2**16):
"""
Usage:
Inside request handler:
return (await send_file_partial("/path/to/file.ext", request))
"""
if not os.path.exists(path):
raise web.HTTPInternalServerError
st = os.stat(path)
size = st.st_size
ct, encoding = mimetypes.guess_type(path)
if not ct:
ct = 'application/octet-stream'
range_header = request.headers.get('Range', None)
if not range_header:
modsince = request.if_modified_since
if modsince is not None and st.st_mtime <= modsince.timestamp():
raise web.HTTPNotModified
r = web.StreamResponse()
r.headers["Accept-Ranges"] = "bytes"
r.content_type = ct
if encoding:
r.headers[hdrs.CONTENT_ENCODING] = encoding
r.last_modified = st.st_mtime
r.content_length = size
r.set_tcp_cork(True)
try:
r.start(request)
with open(path, 'rb') as f:
await StaticResponse().sendfile(request, r, f, size)
finally:
r.set_tcp_nodelay(True)
return r
byte1, byte2 = 0, None
m = re.search('(\d+)-(\d*)', range_header)
g = m.groups()
if g[0]:
byte1 = int(g[0])
if g[1]:
byte2 = int(g[1])
length = size - byte1
if byte2 is not None:
length = byte2 - byte1 + 1
r = web.StreamResponse(status=206)
r.headers[
"Content-Range"] = 'bytes {0}-{1}/{2}'.format(byte1, byte1 + length - 1, size)
r.content_type = ct
if encoding:
r.headers[hdrs.CONTENT_ENCODING] = encoding
r.last_modified = st.st_mtime
r.content_length = length
r.set_tcp_cork(True)
try:
r.start(request)
with open(path, 'rb') as f:
await StaticResponse(offset=byte1).sendfile(request, r, f, length)
finally:
r.set_tcp_nodelay(True)
return r
import asyncio
from aiohttp import web
from aiohttp_filehandler import send_file_partial
async def hello(request):
return (await send_file_partial("/path/to/large/file.csv", request))
app = web.Application()
app.router.add_route('GET', '/', hello)
web.run_app(app, port=8888)
import asyncio
import os
class StaticResponse():
# based on aiohttp StaticRoute
# https://github.com/KeepSafe/aiohttp/blob/master/aiohttp/web_urldispatcher.py
# sends out files using os.sendfile
def __init__(self, *,
chunk_size=2**16, offset=0, nosendfile=False):
self._chunk_size = chunk_size
self._offset = offset
if bool(os.environ.get("AIOHTTP_NOSENDFILE")) or nosendfile:
self.sendfile = self._sendfile_fallback
def _sendfile_cb(self, fut, out_fd, in_fd, offset, count, loop,
registered):
if registered:
loop.remove_writer(out_fd)
try:
n = os.sendfile(out_fd, in_fd, offset, count)
if n == 0: # EOF reached
n = count
except (BlockingIOError, InterruptedError):
n = 0
except Exception as exc:
try:
fut.set_exception(exc)
return
except asyncio.futures.InvalidStateError:
return
if n < count:
loop.add_writer(out_fd, self._sendfile_cb, fut, out_fd, in_fd,
offset + n, count - n, loop, True)
else:
fut.set_result(None)
async def _sendfile_system(self, req, resp, fobj, count):
"""
Write `count` bytes of `fobj` to `resp` starting from `offset` using
the ``sendfile`` system call.
`req` should be a :obj:`aiohttp.web.Request` instance.
`resp` should be a :obj:`aiohttp.web.StreamResponse` instance.
`fobj` should be an open file object.
`count` should be an integer > 0.
"""
transport = req.transport
if transport.get_extra_info("sslcontext"):
await self._sendfile_fallback(req, resp, fobj, count)
return
await resp.drain()
loop = req.app.loop
out_fd = transport.get_extra_info("socket").fileno()
in_fd = fobj.fileno()
fut = asyncio.Future(loop=loop)
self._sendfile_cb(fut, out_fd, in_fd, self._offset, count, loop, False)
await fut
async def _sendfile_fallback(self, req, resp, fobj, count):
"""
Mimic the :meth:`_sendfile_system` method, but without using the
``sendfile`` system call. This should be used on systems that don't
support the ``sendfile`` system call.
To avoid blocking the event loop & to keep memory usage low, `fobj` is
transferred in chunks controlled by the `chunk_size` argument to
:class:`StaticRoute`.
"""
chunk_size = self._chunk_size
fobj.seek(self._offset) # IMPORTANT: seek to offset
chunk = fobj.read(chunk_size)
while chunk and count > chunk_size:
resp.write(chunk)
await resp.drain()
count = count - chunk_size
chunk = fobj.read(chunk_size)
if chunk:
resp.write(chunk[:count])
await resp.drain()
if hasattr(os, "sendfile"): # pragma: no cover
sendfile = _sendfile_system
else: # pragma: no cover
sendfile = _sendfile_fallback
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment