-
-
Save dmckeone/b6d2160389640270eb24d4b0555a7768 to your computer and use it in GitHub Desktop.
Sanic Compress Extension with gzip, deflate, or brotli
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Sanic Compress Extension | |
Compress responses using gzip, deflate or brotli (if [brotli](https://pypi.org/project/Brotli/) or | |
[brotlicffi](https://pypi.org/project/brotlicffi/) is installed). | |
Limitations: | |
* No compression on streaming responses | |
* No compression on chunked responess | |
Inspired by: | |
* sanic-compress: https://github.com/subyraman/sanic_compress | |
* sanic-brogz: https://pypi.org/project/sanic-brogz/ | |
""" | |
try: | |
try: | |
import brotlicffi as brotli | |
except ImportError: | |
import brotli | |
except ImportError: | |
brotli = None | |
import gzip | |
import zlib | |
from asyncio import get_event_loop | |
from sanic.response import ResponseStream | |
from sanic_ext import Extension, Extend | |
DEFAULT_MIME_TYPES = frozenset( | |
["text/html", "text/css", "text/xml", "application/json", "application/javascript"] | |
) | |
def clamp(val, minimum, maximum): | |
""" | |
Clamp value between a minimum and maximum. | |
""" | |
if val < minimum: | |
return minimum | |
elif val > maximum: | |
return maximum | |
return val | |
class Compress(Extension): | |
""" | |
Sanic Extension for compressing response content | |
""" | |
name = "compress" | |
def included(self): | |
return self.app.config.COMPRESS | |
def startup(self, bootstrap: Extend) -> None: | |
app = self.app | |
self._set_defaults(app) | |
if not self.included(): | |
return | |
@app.on_response | |
async def compress_response(request, response): | |
await self._compress_response(request, response) | |
def label(self): | |
types = ["deflate", "gzip"] | |
if brotli: | |
types.append("brotli") | |
return ", ".join(sorted(types)) | |
@classmethod | |
def _set_defaults(cls, app): | |
config = app.config | |
# fmt: off | |
# Shared | |
config.COMPRESS = config.get("COMPRESS", False) | |
config.COMPRESS_MIMETYPES = config.get("COMPRESS_MIMETYPES", DEFAULT_MIME_TYPES) | |
config.COMPRESS_LEVEL = config.get("COMPRESS_LEVEL", 6) | |
config.COMPRESS_MIN_SIZE = config.get("COMPRESS_MIN_SIZE", 1024) # 1KB | |
# fmt: on | |
async def _compress_response(self, request, response): | |
if type(response) is ResponseStream: | |
# EARLY EXIT: Streaming responses not yet supported | |
return | |
if "Content-Encoding" in response.headers: | |
# EARLY EXIT: Content already encoded | |
return | |
if not 200 <= response.status < 300: | |
# EARLY EXIT: Only compress valid responses | |
return | |
content_type = response.content_type | |
if content_type and ";" in content_type: | |
content_type = content_type.split(";")[0] | |
if content_type not in self.app.config.COMPRESS_MIMETYPES: | |
# EARLY EXIT: Content type not configured for compression | |
return | |
content_length = len(response.body) | |
if ( | |
content_length is not None | |
and content_length < self.app.config.COMPRESS_MIN_SIZE | |
): | |
# EARLY EXIT: Content isn't large enough to compress | |
return | |
# Select algorithm | |
accept_encoding = request.headers.get("Accept-Encoding", "") | |
accepted = {w.strip() for w in accept_encoding.split(",")} | |
if brotli and "br" in accepted: | |
compress_fn = self._br | |
elif "gzip" in accepted: | |
compress_fn = self._gzip | |
elif "deflate" in accepted: | |
compress_fn = self._deflate | |
else: | |
# EARLY EXIT: No valid compression algorithms found | |
return | |
# Select compression | |
level = clamp(self.app.config.COMPRESS_LEVEL, 1, 9) | |
# Compress on backmonitor thread -- all compression algorithms are synchronous | |
encoding, body = await get_event_loop().run_in_executor( | |
None, compress_fn, response.body, level | |
) | |
# Set response body and headers to match the new settings | |
response.body = body | |
response.headers["Content-Encoding"] = encoding | |
response.headers["Content-Length"] = len(body) | |
# Set Vary headers for efficient cache key in browser | |
vary = response.headers.get("Vary") | |
if vary: | |
if "accept-encoding" not in vary.lower(): | |
response.headers["Vary"] = "{}, Accept-Encoding".format(vary) | |
else: | |
response.headers["Vary"] = "Accept-Encoding" | |
@classmethod | |
def _gzip(cls, body, level): | |
return "gzip", gzip.compress(body, compresslevel=level) | |
@classmethod | |
def _deflate(cls, body, level): | |
return "deflate", zlib.compress(body, compresslevel=level) | |
@classmethod | |
def _br(cls, body, level): | |
return "br", brotli.compress(body, quality=level) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment