Skip to content

Instantly share code, notes, and snippets.

@ojii
Created February 17, 2017 10:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ojii/2f4cd762dd68d6b0a0d7ed48d966f34e to your computer and use it in GitHub Desktop.
Save ojii/2f4cd762dd68d6b0a0d7ed48d966f34e to your computer and use it in GitHub Desktop.
sanic streamed response
class StreamResponse(HTTPResponse):
def __init__(self, request, length, status=200, headers=None,
content_type='text/plain', ):
self.request = request
self.length = length
self._headers_sent = False
headers = headers or {}
headers['Content-Length'] = str(length)
super().__init__(None, status, headers, content_type)
def send_headers(self, keep_alive=True, keep_alive_timeout=60):
if self._headers_sent:
raise Exception('Headers already sent')
self.request.transport.write(
super().output(self.request.version, keep_alive, keep_alive_timeout)
)
self._headers_sent = True
def write(self, chunk):
if not self._headers_sent:
raise Exception('Must send headers before writing')
self.transport.write(chunk)
def output(self, *args):
if not self._headers_sent:
raise Exception('Must send headers before finishing')
return b''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment