Skip to content

Instantly share code, notes, and snippets.

@dmckeone
Last active October 4, 2023 18:24
Show Gist options
  • Save dmckeone/b6d2160389640270eb24d4b0555a7768 to your computer and use it in GitHub Desktop.
Save dmckeone/b6d2160389640270eb24d4b0555a7768 to your computer and use it in GitHub Desktop.
Sanic Compress Extension with gzip, deflate, or brotli
"""
Sanic Compress Extension
Compress responses using gzip, deflate or brotli (if [brotli](https://pypi.org/project/Brotli/) or
[brotlicffi](https://pypi.org/project/brotlicffi/) is installed).
Limitations:
* No compression on streaming responses
* No compression on chunked responess
Inspired by:
* sanic-compress: https://github.com/subyraman/sanic_compress
* sanic-brogz: https://pypi.org/project/sanic-brogz/
"""
try:
try:
import brotlicffi as brotli
except ImportError:
import brotli
except ImportError:
brotli = None
import gzip
import zlib
from asyncio import get_event_loop
from sanic.response import ResponseStream
from sanic_ext import Extension, Extend
DEFAULT_MIME_TYPES = frozenset(
["text/html", "text/css", "text/xml", "application/json", "application/javascript"]
)
def clamp(val, minimum, maximum):
"""
Clamp value between a minimum and maximum.
"""
if val < minimum:
return minimum
elif val > maximum:
return maximum
return val
class Compress(Extension):
"""
Sanic Extension for compressing response content
"""
name = "compress"
def included(self):
return self.app.config.COMPRESS
def startup(self, bootstrap: Extend) -> None:
app = self.app
self._set_defaults(app)
if not self.included():
return
@app.on_response
async def compress_response(request, response):
await self._compress_response(request, response)
def label(self):
types = ["deflate", "gzip"]
if brotli:
types.append("brotli")
return ", ".join(sorted(types))
@classmethod
def _set_defaults(cls, app):
config = app.config
# fmt: off
# Shared
config.COMPRESS = config.get("COMPRESS", False)
config.COMPRESS_MIMETYPES = config.get("COMPRESS_MIMETYPES", DEFAULT_MIME_TYPES)
config.COMPRESS_LEVEL = config.get("COMPRESS_LEVEL", 6)
config.COMPRESS_MIN_SIZE = config.get("COMPRESS_MIN_SIZE", 1024) # 1KB
# fmt: on
async def _compress_response(self, request, response):
if type(response) is ResponseStream:
# EARLY EXIT: Streaming responses not yet supported
return
if "Content-Encoding" in response.headers:
# EARLY EXIT: Content already encoded
return
if not 200 <= response.status < 300:
# EARLY EXIT: Only compress valid responses
return
content_type = response.content_type
if content_type and ";" in content_type:
content_type = content_type.split(";")[0]
if content_type not in self.app.config.COMPRESS_MIMETYPES:
# EARLY EXIT: Content type not configured for compression
return
content_length = len(response.body)
if (
content_length is not None
and content_length < self.app.config.COMPRESS_MIN_SIZE
):
# EARLY EXIT: Content isn't large enough to compress
return
# Select algorithm
accept_encoding = request.headers.get("Accept-Encoding", "")
accepted = {w.strip() for w in accept_encoding.split(",")}
if brotli and "br" in accepted:
compress_fn = self._br
elif "gzip" in accepted:
compress_fn = self._gzip
elif "deflate" in accepted:
compress_fn = self._deflate
else:
# EARLY EXIT: No valid compression algorithms found
return
# Select compression
level = clamp(self.app.config.COMPRESS_LEVEL, 1, 9)
# Compress on backmonitor thread -- all compression algorithms are synchronous
encoding, body = await get_event_loop().run_in_executor(
None, compress_fn, response.body, level
)
# Set response body and headers to match the new settings
response.body = body
response.headers["Content-Encoding"] = encoding
response.headers["Content-Length"] = len(body)
# Set Vary headers for efficient cache key in browser
vary = response.headers.get("Vary")
if vary:
if "accept-encoding" not in vary.lower():
response.headers["Vary"] = "{}, Accept-Encoding".format(vary)
else:
response.headers["Vary"] = "Accept-Encoding"
@classmethod
def _gzip(cls, body, level):
return "gzip", gzip.compress(body, compresslevel=level)
@classmethod
def _deflate(cls, body, level):
return "deflate", zlib.compress(body, compresslevel=level)
@classmethod
def _br(cls, body, level):
return "br", brotli.compress(body, quality=level)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment