Skip to content

Instantly share code, notes, and snippets.

@tipabu
Last active October 22, 2019 23:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tipabu/7c8ed7a713a7f51f20c2 to your computer and use it in GitHub Desktop.
Save tipabu/7c8ed7a713a7f51f20c2 to your computer and use it in GitHub Desktop.
Simple opportunistic gzip encoding for Swift
"""
Middleware to provide on-the-fly compression.
For requests with a supported Transfer-Encoding, this will expose a decoded
body to the rest of the pipeline. If the client indicated support for a
compressed response (depending on configuration, via a TE or Accept-Encoding
header) and the response meets certain conditions, the body will be compressed
using the most-acceptable format.
.. warning::
SECURITY ADVISORY:
It is not recommended to enable this feature in a public-cloud
environment. Malicious users may use this for denial-of-service
attacks by issuing PUTs/GETs for files with a very high compression
ratio.
.. note::
Using this to provide Transfer-Encoding compression violates the WSGI
spec, which states "applications and middleware **must not** apply any
kind of ``Transfer-Encoding`` to their output, such as chunking or
gzipping". Additionally, it may only work properly for eventlet's WSGI
implementation.
Configuration options:
``use_transfer_encoding``
If true, use TE/Transfer-Encoding headers; otherwise, use
Accept-Encoding/Content-Encoding headers. Note that "[t]he
content-coding is a characteristic of the entity identified
by the Request-URI"; by changing it, clients may think there
are integrity errors.
Default: True
``min_content_length``
Only compress content that was originally at least this size
(in bytes). This may be used to avoid compressing small files,
where the resulting stream may be larger than the original.
Default: 1024 (1 KiB)
``min_chunking_length``
If not using transfer-encoding and the original content is
smaller than this, read the entire compressed file into memory
before sending. This allows the proxy to include a
Content-Length header.
Default: 1048576 (1 MiB)
``decode_chunk_size``
When decompressing uploaded objects, send this many bytes at a
time to the decompressor. It is recommended that this be the
same as the client_chunk_size setting for the proxy-server app.
Default: 65536 (64 KiB)
``compression_level``
The compression-level to use. Must be in the range 1 (fastest)
to 9 (most compression).
Default: 6
``content_type_whitelist``
A list of content-types that *should* be compressed. Globs are
not supported.
Default: blank (all content types will be compressed)
``content_type_blacklist``
A list of content-types that *should not* be compressed. Globs
are not supported. If a content-type is listed on both the
whitelist and the blacklist, it will *not* be compressed.
Default: application/gzip, application/x-bzip2, application/zip,
application/x-7z-compressed, application/x-rar-compressed,
application/vnd.debian.binary-package, application/x-rpm,
application/java-archive, application/x-apple-diskimage,
application/vnd.ms-cab-compressed
"""
import bz2
import gzip
import io
import re
import zlib
from swift.common import swob
from swift.common import utils
class GzipCompressor(object):
"""Wrapper to allow GzipFile to be an incremental encoder."""
def __init__(self, compresslevel):
super(GzipCompressor, self).__init__()
self.buf = io.BytesIO()
self.gzipper = gzip.GzipFile(mode='wb', fileobj=self.buf,
compresslevel=compresslevel)
def compress(self, data):
"""Provide more data to the compressor."""
self.gzipper.write(data)
data = self.buf.getvalue()
self.buf.seek(0)
self.buf.truncate(0)
return data
def flush(self):
"""Finish the compression process."""
self.gzipper.close()
return self.buf.getvalue()
def compressing_iterator(compressor):
"""Return a function that will compress the output from an iterator."""
def compress_iter(app_iter, conf):
"""Wrap an iterator to be incrementally compressed."""
compobj = compressor(conf['compression_level'])
for data in app_iter:
to_send = compobj.compress(data)
if to_send:
yield to_send
yield compobj.flush()
return compress_iter
class FileLikeDecompressor(object):
"""A file-like wrapper to decompress other file-like objects."""
def __init__(self, fileobj, chunk_size=65536):
super(FileLikeDecompressor, self).__init__()
self.fileobj = fileobj
self.chunk_size = chunk_size
self.decompobj = self.decompressor()
self.buf = b''
self.end_of_file = False
def read(self, size=None):
"""Read ``size`` (decompressed) bytes from the underlying stream."""
if size is None:
raise ValueError('%s only supports chunked reads'
% self.__class__.__name__)
to_read = size - len(self.buf)
if to_read > 0 and not self.end_of_file:
buf = io.BytesIO()
buf.write(self.buf)
while to_read > 0 and not self.end_of_file:
chunk = self.fileobj.read(self.chunk_size)
if len(chunk) < self.chunk_size:
self.end_of_file = True
try:
chunk = self.decompobj.decompress(chunk)
except Exception:
raise swob.HTTPBadRequest('Error decoding request body')
buf.write(chunk)
to_read -= len(chunk)
self.buf = buf.getvalue()
result, self.buf = self.buf[:size], self.buf[size:]
return result
class GzipDecompressor(FileLikeDecompressor):
"""A file-like wrapper to decompress gzip/zlib streams."""
@staticmethod
def decompressor():
# 32 = detect headers, so we can handle clients that just use zlib
return zlib.decompressobj(32 + zlib.MAX_WBITS)
class Bzip2Decompressor(FileLikeDecompressor):
"""A file-like wrapper to decompress bz2 streams."""
decompressor = bz2.BZ2Decompressor
ENCODERS = {
'deflate': compressing_iterator(zlib.compressobj),
'gzip': compressing_iterator(GzipCompressor),
'x-bzip2': compressing_iterator(bz2.BZ2Compressor),
}
DECODERS = {
'gzip': GzipDecompressor,
'x-bzip2': Bzip2Decompressor,
}
ALIASES = {
'x-gzip': 'gzip',
}
# Identity comes after encoders but before aliases, so it is preferred
# over aliases unless the client specifically requests the alias
PREFERRRED_ENCODINGS = ['gzip', 'x-bzip2', 'deflate', 'identity', 'x-gzip']
class AllowedEncodings(object):
"""Wraps a Request's Accept-Encoding/TE header as a friendly object.
Borrows heavily from swift.common.swob.Accept, q.v.
:param headerval: value of the header as a str
"""
# RFC 2616 section 2.2
token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+'
qdtext = r'[^"]'
quoted_pair = r'(?:\\.)'
quoted_string = r'"(?:' + qdtext + r'|' + quoted_pair + r')*"'
extension = (r'(?:\s*;\s*(?:' + token + r")\s*=\s*" + r'(?:' + token +
r'|' + quoted_string + r'))')
acc_pattern = re.compile(r'^\s*(' + token + r')(' + extension + r'*?\s*)$')
def __init__(self, headername, headerval):
self.headername = headername
self.headerval = headerval
def _get_types(self):
"""Get the client's preferred encodings.
:returns: a list of regex patterns to check, sorted from
most-acceptable to least
"""
types = []
if not self.headerval:
return []
for typ in self.headerval.split(','):
type_parms = self.acc_pattern.findall(typ)
if not type_parms:
raise ValueError('Invalid %s header' % self.headername)
typ, parms = type_parms[0]
parms = [p.strip() for p in parms.split(';') if p.strip()]
seen_q_already = False
quality = 1.0
for parm in parms:
name, value = parm.split('=')
name = name.strip()
value = value.strip()
if name == 'q':
if seen_q_already:
raise ValueError('Multiple "q" params')
seen_q_already = True
quality = float(value)
pattern = '^' + \
(self.token if typ == '*' else re.escape(typ)) + '$'
types.append((re.compile(pattern), quality, '*' == typ))
# sort candidates by whether or not there were globs
types.sort(key=lambda t: t[2])
return types
def best_match(self, options):
"""Get the item from "options" that best matches the header.
:param options: a list of content-/transfer-encodings the server can
respond with
:returns: the most acceptable encoding from "options", or None if no
option is acceptable
"""
try:
types = self._get_types()
except ValueError:
return None
if not types and options:
return options[0]
option_qualities = []
for i, option in enumerate(options):
for pattern, quality, globbed in types:
if pattern.match(option):
opt_qual = (-quality, globbed, i)
break
else:
opt_qual = (0, 0, 0)
option_qualities.append(opt_qual)
i, option = sorted(enumerate(options),
key=lambda x: option_qualities[x[0]])[0]
return option if option_qualities[i][0] else None
def __repr__(self):
return self.headerval
def update_header(headers_dict, header, value):
"""Add a value to a comma-separated header.
Creates the header if it doesn't already exist.
Doesn't add values that are already present.
:param headers_dict: the header dictionary for the response
:param header: the header whose value should be updated
:param value: the value that should be added
"""
header = header.lower()
if header not in headers_dict:
headers_dict[header] = value
elif value.lower() not in headers_dict[header].lower():
headers_dict[header] += ', ' + value
def maybe_decode(req, chunk_size=65536):
"""Check for a Transfer-Encoding header and try to decode.
If Transfer-Encoding header is present and something we know how to handle,
replace the request's body_file with an appropriate FileLikeDecompressor.
"""
if 'transfer-encoding' not in req.headers:
return
encodings = comma_separated_list(req.headers['transfer-encoding'])
if encodings[-1] != 'chunked':
raise swob.HTTPBadRequest(
'Final Transfer-Encoding must be "chunked"')
if encodings.index('chunked') != len(encodings) - 1:
raise swob.HTTPBadRequest(
'Transfer-Encoding may only include "chunked" once')
# eventlet only detects chunked encoding when it's the *only* encoding
if hasattr(req.environ['wsgi.input'], 'wsgi_input'):
req.environ['wsgi.input'].wsgi_input.chunked_input = True
for i, enc in reversed(list(enumerate(encodings[:-1], 1))):
if enc in DECODERS:
req.body_file = DECODERS[enc](req.body_file, chunk_size)
elif ALIASES.get(enc, '') in DECODERS:
req.body_file = DECODERS[ALIASES[enc]](req.body_file, chunk_size)
else:
break
else:
i = 0
encodings = encodings[:i] + ['chunked']
req.headers['transfer-encoding'] = ', '.join(encodings)
req.headers.pop('content-length', '')
class CompressionMiddleware(object):
"""WSGI middleware to de/compress request/response bodies."""
def __init__(self, app, conf):
self.app = app
self.conf = conf.copy()
if self.conf['use_transfer_encoding']:
self.conf['req_header'] = 'TE'
self.conf['resp_header'] = 'Transfer-Encoding'
else:
self.conf['req_header'] = 'Accept-Encoding'
self.conf['resp_header'] = 'Content-Encoding'
def can_encode(self, resp):
"""Check whether this is a response that should be compressed."""
return (
# Ensure we have content and that it's larger than some threshold
resp.content_length and
resp.content_length >= self.conf['min_content_length'] and
# Ensure we aren't already doing some sort of encoding
'content-encoding' not in resp.headers and
'transfer-encoding' not in resp.headers and
# Ensure we're on the whitelist (if present)
(not self.conf['content_type_whitelist'] or
resp.content_type in self.conf['content_type_whitelist']) and
# Ensure we're not on the blacklist
resp.content_type not in self.conf['content_type_blacklist'] and
True)
def preferred_encoding(self, req):
"""Get the client's preferred compression method."""
if 'no-transform' in req.headers.get('cache-control', '') and \
not self.conf['use_transfer_encoding']:
return 'identity'
acceptable = AllowedEncodings(
self.conf['req_header'],
req.headers.get(self.conf['req_header'], 'identity'))
preferred = acceptable.best_match(PREFERRRED_ENCODINGS)
return preferred or 'identity'
@swob.wsgify
def __call__(self, req):
maybe_decode(req, self.conf['decode_chunk_size'])
resp = req.get_response(self.app)
orig_len = resp.content_length
encoding = self.preferred_encoding(req)
if not self.can_encode(resp) or encoding == 'identity':
return resp
encoder = ENCODERS.get(encoding)
if not encoder:
encoder = ENCODERS[ALIASES[encoding]]
resp.headers[self.conf['resp_header']] = encoding
resp.app_iter = encoder(resp.app_iter, self.conf)
if not self.conf['use_transfer_encoding']:
warning = '214 proxy-server:compression "%s modified"' % \
self.conf['resp_header']
update_header(resp.headers, 'warning', warning)
update_header(resp.headers, 'vary', self.conf['req_header'])
if orig_len is not None and \
orig_len < self.conf['min_chunking_length']:
# Should fit comfortably in memory
resp.app_iter = list(resp.app_iter)
return resp
def comma_separated_list(value):
"""Turn a string of comma-separated values into a list."""
if isinstance(value, (list, tuple)):
return value
return [x.strip() for x in value.split(',')]
def compression_factory(global_conf, **local_conf):
"""Factory for building new compression filters."""
conf = {
'use_transfer_encoding': (utils.config_true_value, True),
# maybe sane defaults?
'min_content_length': (int, 1024),
'min_chunking_length': (int, 1024 * 1024),
'decode_chunk_size': (int, 64 * 1024),
# default from zlib
'compression_level': (int, 6),
'content_type_whitelist': (comma_separated_list, []),
'content_type_blacklist': (comma_separated_list, [
'application/gzip',
'application/x-bzip2',
'application/zip',
'application/x-7z-compressed',
'application/x-rar-compressed',
'application/vnd.debian.binary-package',
'application/x-rpm',
'application/java-archive',
'application/x-apple-diskimage',
'application/vnd.ms-cab-compressed',
]),
}
conf.update((k, t(local_conf.get(k, d))) for k, (t, d) in conf.items())
conf['encodings'] = PREFERRRED_ENCODINGS
if hasattr(utils, 'register_swift_info'): # swift>=1.11.0
utils.register_swift_info('compression', **conf)
def compression_filter(app):
"""Building a new compression filter."""
return CompressionMiddleware(app, conf)
return compression_filter
from __future__ import unicode_literals
import bz2
import gzip
import io
import mock
import unittest
import zlib
from swift.common import swob
import compression as c
def gzip_file(data):
buf = io.BytesIO()
fake_file = gzip.GzipFile(fileobj=buf, mode='wb')
fake_file.write(data)
fake_file.close()
buf.seek(0)
return buf
@mock.patch('compression.gzip.time.time', return_value=0xdeadbeef)
class GzipCompressorTest(unittest.TestCase):
def _test(self, cases, compression_level=0, compobj=None):
if compobj is None:
compobj = c.GzipCompressor(compression_level)
for data_in, data_out in cases:
if data_in is None:
self.assertEqual(data_out, compobj.flush())
else:
self.assertEqual(data_out, compobj.compress(data_in))
return compobj
def test_empty_file(self, mock_time):
self._test([(None, (b'\x1f\x8b\x08\x00\xef\xbe\xad\xde\x02\xff' +
zlib.compress(b'', 0)[2:-4] +
b'\x00\x00\x00\x00\x00\x00\x00\x00'))], 0)
self._test([(None, (b'\x1f\x8b\x08\x00\xef\xbe\xad\xde\x02\xff' +
zlib.compress(b'', 9)[2:-4] +
b'\x00\x00\x00\x00\x00\x00\x00\x00'))], 9)
def test_junk_data(self, mock_time):
self._test([(b'junk', b'\x1f\x8b\x08\x00\xef\xbe\xad\xde\x02\xff'),
(b'data', b''),
(None, (zlib.compress(b'junkdata', 0)[2:-4] +
b'\x9f\x98\xf2\xca\x08\x00\x00\x00'))], 0)
self._test([(b'junk', b'\x1f\x8b\x08\x00\xef\xbe\xad\xde\x02\xff'),
(b'data', b''),
(None, (b'\xcb\x2a\xcd\xcb\x4e\x49\x2c\x49\x04\x00'
b'\x9f\x98\xf2\xca\x08\x00\x00\x00'))], 9)
def test_flushed_cannot_compress(self, mock_time):
comp = c.GzipCompressor(0)
comp.flush()
try:
comp.compress(b'')
except ValueError as exc:
self.assertIn('I/O operation on closed file', exc.args[0])
else:
self.fail('Expected ValueError to be raised.')
def test_long_input(self, mock_time):
data = b'a' * (2 ** 14)
self._test([(data, b'\x1f\x8b\x08\x00\xef\xbe\xad\xde\x02\xff'),
(data, b'\x00\x00\x80\xff\x7f' + data * 2),
(None, (b'\x01\x00\x00\xff\xff'
b'\xe3\x5f\xda\xec\x00\x80\x00\x00'))], 0)
class CompressingIteratorTest(unittest.TestCase):
class FakeCompressor(object):
def __init__(self, compression_level):
self.compression_level = compression_level
def compress(self, data):
return data
def flush(self):
return self.compression_level
def test_compressing_iterator(self):
app_iter = [b'some', b'junk', b'data']
comp = c.compressing_iterator(CompressingIteratorTest.FakeCompressor)
out_iter = comp(app_iter, {'compression_level': 9})
self.assertEqual(app_iter + [9], list(out_iter))
class BadRequestTester(unittest.TestCase):
def assert_bad_request(self, msg, func, *args, **kwargs):
try:
func(*args, **kwargs)
except swob.HTTPException as exc:
self.assertEqual('400 Bad Request', exc.status)
self.assertEqual(msg.encode('ascii'), exc.body)
else:
self.fail('Expected HTTPException to be raised')
class GzipDecompressorTest(BadRequestTester):
def test_no_slurping(self):
buf = gzip_file(b'Hello, world!')
decomp = c.GzipDecompressor(buf)
try:
decomp.read()
except ValueError as exc:
self.assertIn('only supports chunked reads', exc.args[0])
else:
self.fail('Expected ValueError to be raised.')
def test_gzipped_file(self):
buf = gzip_file(b'Hello, world!')
decomp = c.GzipDecompressor(buf)
self.assertEqual(b'Hello, ', decomp.read(7))
self.assertEqual(b'world!', decomp.read(7))
def test_zlibbed_file(self):
buf = io.BytesIO(zlib.compress(b'Hello, world!', 0))
decomp = c.GzipDecompressor(buf)
self.assertEqual(b'Hel', decomp.read(3))
self.assertEqual(b'lo,', decomp.read(3))
self.assertEqual(b' wo', decomp.read(3))
self.assertEqual(b'rld!', decomp.read(30))
buf = io.BytesIO(zlib.compress(b'Hello, world!', 9))
decomp = c.GzipDecompressor(buf)
self.assertEqual(b'Hello, world!', decomp.read(30))
def test_empty_file(self):
buf = io.BytesIO()
decomp = c.GzipDecompressor(buf)
self.assertEqual(b'', decomp.read(1))
def test_bad_file(self):
buf = io.BytesIO(b'foo')
decomp = c.GzipDecompressor(buf)
self.assert_bad_request('Error decoding request body', decomp.read, 1)
class Bzip2DecompressorTest(BadRequestTester):
def test_no_slurping(self):
buf = gzip_file(b'Hello, world!')
decomp = c.Bzip2Decompressor(buf)
try:
decomp.read()
except ValueError as exc:
self.assertIn('only supports chunked reads', exc.args[0])
else:
self.fail('Expected ValueError to be raised.')
def test_bzipped_file(self):
buf = io.BytesIO(bz2.compress(b'foo bar baz'))
decomp = c.Bzip2Decompressor(buf)
self.assertEqual(b'foo', decomp.read(3))
self.assertEqual(b' ba', decomp.read(3))
self.assertEqual(b'r b', decomp.read(3))
self.assertEqual(b'az', decomp.read(3))
def test_empty_file(self):
buf = io.BytesIO()
decomp = c.Bzip2Decompressor(buf)
self.assertEqual(b'', decomp.read(1))
def test_bad_file(self):
buf = io.BytesIO(b'foo')
decomp = c.Bzip2Decompressor(buf)
self.assert_bad_request('Error decoding request body', decomp.read, 1)
class AllowedEncodingsTest(unittest.TestCase):
options = ['gzip', 'x-bzip2', 'baz']
def test_invalid(self):
header = c.AllowedEncodings('Accept-Encoding', ',')
self.assertEqual(None, header.best_match(self.options))
header = c.AllowedEncodings('Accept-Encoding', 'baz;q=')
self.assertEqual(None, header.best_match(self.options))
header = c.AllowedEncodings('Accept-Encoding', 'baz;q=1;q=3')
self.assertEqual(None, header.best_match(self.options))
header = c.AllowedEncodings('Accept-Encoding', 'baz;q=qq')
self.assertEqual(None, header.best_match(self.options))
def test_acceptable(self):
header = c.AllowedEncodings('Accept-Encoding', None)
self.assertEqual('gzip', header.best_match(self.options))
header = c.AllowedEncodings('Accept-Encoding', '*')
self.assertEqual('gzip', header.best_match(self.options))
header = c.AllowedEncodings('Accept-Encoding', 'foo, bar;q=0,baz')
self.assertEqual('baz', header.best_match(self.options))
header = c.AllowedEncodings('TE', '*, bar;q=0,baz')
self.assertEqual('baz', header.best_match(self.options))
header = c.AllowedEncodings('TE', '*, bar;q=0, baz;q=.5')
self.assertEqual('gzip', header.best_match(self.options))
header = c.AllowedEncodings('TE', '*;q=0,gzip')
self.assertEqual('gzip', header.best_match(self.options))
header = c.AllowedEncodings('Accept-Encoding', '*,gzip;q=0')
self.assertEqual('x-bzip2', header.best_match(self.options))
def test_not_acceptable(self):
header = c.AllowedEncodings('Accept-Encoding', 'foo, bar;q=0,baz')
self.assertEqual(None, header.best_match(self.options[:-1]))
header = c.AllowedEncodings('Accept-Encoding', 'foo, bar;q=0,baz;q=0')
self.assertEqual(None, header.best_match(self.options))
header = c.AllowedEncodings('Accept-Encoding', 'foo')
self.assertEqual(None, header.best_match(self.options))
header = c.AllowedEncodings('Accept-Encoding', 'identity')
self.assertEqual(None, header.best_match(self.options))
def test_repr(self):
header_val = 'foo, bar;q=0,baz'
header = c.AllowedEncodings('Accept-Encoding', header_val)
self.assertEqual(header_val, repr(header))
class UpdateHeaderTest(unittest.TestCase):
def test_header_value_already_present(self):
headers = {'foo': 'bar,baz'}
c.update_header(headers, 'foo', 'BAR')
self.assertEqual({'foo': 'bar,baz'}, headers)
def test_header_already_present(self):
headers = {'foo': 'bar,baz'}
c.update_header(headers, 'foo', 'quux')
self.assertEqual({'foo': 'bar,baz, quux'}, headers)
def test_header_not_present(self):
headers = {'foo': 'bar,baz'}
c.update_header(headers, 'foobar', 'quux')
self.assertEqual({'foo': 'bar,baz', 'foobar': 'quux'}, headers)
def _fake_req(headers=None, data=b''):
headers = headers or {}
kwargs = {'headers': headers}
if headers.get('Transfer-Encoding'):
kwargs['environ'] = {'wsgi.input': io.BytesIO(data)}
else:
kwargs['body'] = data
req = swob.Request.blank('/', **kwargs)
req.environ['wsgi.input'].wsgi_input = mock.Mock()
# eventlet only detects chunked encoding when it's the *only* encoding
req.environ['wsgi.input'].wsgi_input.chunked_input = (
headers.get('Transfer-Encoding', '') == 'chunked')
return req
class MaybeDecodeTest(BadRequestTester):
def test_no_transfer_encoding(self):
req = swob.Request.blank('/')
expected = req.__dict__.copy()
c.maybe_decode(req)
self.assertEqual(expected, req.__dict__)
def test_just_chunked(self):
req = _fake_req({'Transfer-Encoding': 'chunked'})
expected = req.__dict__.copy()
c.maybe_decode(req)
self.assertEqual(expected, req.__dict__)
self.assertTrue(req.environ['wsgi.input'].wsgi_input.chunked_input)
def test_no_chunked(self):
req = _fake_req({'Transfer-Encoding': 'gzip'})
self.assert_bad_request('Final Transfer-Encoding must be "chunked"',
c.maybe_decode, req)
def test_two_chunked(self):
req = _fake_req({'Transfer-Encoding': 'chunked, gzip, chunked'})
self.assert_bad_request(
'Transfer-Encoding may only include "chunked" once',
c.maybe_decode, req)
def test_gzip(self):
buf = gzip_file(b'Hello, world!')
req = _fake_req({'Transfer-Encoding': 'gzip, chunked'},
buf.getvalue())
expected = req.environ.copy()
expected['HTTP_TRANSFER_ENCODING'] = 'chunked'
base_input = expected.pop('wsgi.input')
self.assertFalse(base_input.wsgi_input.chunked_input)
c.maybe_decode(req)
self.assertTrue(base_input.wsgi_input.chunked_input)
new_input = req.environ.pop('wsgi.input')
self.assertEqual(b'Hello, ', new_input.read(7))
self.assertEqual(b'world!', new_input.read(7))
self.assertEqual(expected, req.environ)
def test_bzip2(self):
data = bz2.compress(b'Hello, world!')
req = _fake_req({'Transfer-Encoding': 'x-bzip2, chunked'}, data)
expected = req.environ.copy()
expected['HTTP_TRANSFER_ENCODING'] = 'chunked'
base_input = expected.pop('wsgi.input')
self.assertFalse(base_input.wsgi_input.chunked_input)
c.maybe_decode(req)
self.assertTrue(base_input.wsgi_input.chunked_input)
new_input = req.environ.pop('wsgi.input')
self.assertEqual(b'Hello, ', new_input.read(7))
self.assertEqual(b'world!', new_input.read(7))
self.assertEqual(expected, req.environ)
def test_partial(self):
buf = gzip_file(b'Hello, world!')
req = _fake_req(
{'Transfer-Encoding': 'x-bzip2,foo,x-gzip, chunked'},
buf.getvalue())
expected = req.environ.copy()
expected['HTTP_TRANSFER_ENCODING'] = 'x-bzip2, foo, chunked'
base_input = expected.pop('wsgi.input')
self.assertFalse(base_input.wsgi_input.chunked_input)
c.maybe_decode(req)
self.assertTrue(base_input.wsgi_input.chunked_input)
new_input = req.environ.pop('wsgi.input')
self.assertEqual(b'Hello, ', new_input.read(7))
self.assertEqual(b'world!', new_input.read(7))
self.assertEqual(expected, req.environ)
def test_nested(self):
data = bz2.compress(gzip_file(b'Hello, world!').getvalue())
req = _fake_req({'Transfer-Encoding': 'gzip, x-bzip2, chunked'}, data)
expected = req.environ.copy()
expected['HTTP_TRANSFER_ENCODING'] = 'chunked'
base_input = expected.pop('wsgi.input')
self.assertFalse(base_input.wsgi_input.chunked_input)
c.maybe_decode(req)
self.assertTrue(base_input.wsgi_input.chunked_input)
new_input = req.environ.pop('wsgi.input')
self.assertEqual(b'Hello, ', new_input.read(7))
self.assertEqual(b'world!', new_input.read(7))
self.assertEqual(expected, req.environ)
class CompressionMiddlewareTest(unittest.TestCase):
def test_defaults(self):
fake_app = object()
mware = c.compression_factory({})(fake_app)
self.assertIs(fake_app, mware.app)
self.assertEqual({
'use_transfer_encoding': True,
'req_header': 'TE',
'resp_header': 'Transfer-Encoding',
'encodings': ['gzip', 'x-bzip2', 'deflate', 'identity', 'x-gzip'],
'compression_level': 6,
'content_type_whitelist': [],
'content_type_blacklist': [
'application/gzip',
'application/x-bzip2',
'application/zip',
'application/x-7z-compressed',
'application/x-rar-compressed',
'application/vnd.debian.binary-package',
'application/x-rpm',
'application/java-archive',
'application/x-apple-diskimage',
'application/vnd.ms-cab-compressed',
],
'decode_chunk_size': 2 ** 16,
'min_chunking_length': 2 ** 20,
'min_content_length': 2 ** 10,
}, mware.conf)
def test_defaults_content_encoding(self):
fake_app = object()
mware = c.compression_factory(
{}, use_transfer_encoding=False)(fake_app)
self.assertIs(fake_app, mware.app)
self.maxDiff = 2048
self.assertEqual({
'use_transfer_encoding': False,
'req_header': 'Accept-Encoding',
'resp_header': 'Content-Encoding',
'encodings': ['gzip', 'x-bzip2', 'deflate', 'identity', 'x-gzip'],
'compression_level': 6,
'content_type_whitelist': [],
'content_type_blacklist': [
'application/gzip',
'application/x-bzip2',
'application/zip',
'application/x-7z-compressed',
'application/x-rar-compressed',
'application/vnd.debian.binary-package',
'application/x-rpm',
'application/java-archive',
'application/x-apple-diskimage',
'application/vnd.ms-cab-compressed',
],
'decode_chunk_size': 2 ** 16,
'min_chunking_length': 2 ** 20,
'min_content_length': 2 ** 10,
}, mware.conf)
def _make_mware(self, conf, fake_app=None):
fake_app = fake_app or object()
conf = conf.copy()
conf['min_content_length'] = 10
defaults = {
'compression_level': 6,
'min_chunking_length': 20,
'decode_chunk_size': 1024,
'use_transfer_encoding': True,
'content_type_whitelist': [],
'content_type_blacklist': [],
}
for k, v in defaults.items():
conf.setdefault(k, v)
mware = c.CompressionMiddleware(fake_app, conf)
self.assertIs(fake_app, mware.app)
self.assertEqual(10, mware.conf.get('min_content_length'))
return mware
def test_can_encode_no_lists(self):
mware = self._make_mware({})
self.assertEqual([], mware.conf.get('content_type_whitelist'))
self.assertEqual([], mware.conf.get('content_type_blacklist'))
# Above/at/below min_content_length:
resp = swob.Response(body=b'a' * 11)
self.assertTrue(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 10)
self.assertTrue(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 9)
self.assertFalse(mware.can_encode(resp))
resp = swob.Response()
self.assertFalse(mware.can_encode(resp))
# Already being encoded:
resp = swob.Response(body=b'a' * 11,
headers={'content-encoding': 'foo'})
self.assertFalse(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'transfer-encoding': 'bar'})
self.assertFalse(mware.can_encode(resp))
def test_can_encode_whitelist(self):
mware = self._make_mware({
'content_type_whitelist': ['text/foo', 'text/bar'],
})
self.assertEqual(['text/foo', 'text/bar'],
mware.conf.get('content_type_whitelist'))
self.assertEqual([], mware.conf.get('content_type_blacklist'))
resp = swob.Response(body=b'a' * 11)
self.assertFalse(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'text/foo'})
self.assertTrue(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'application/foo'})
self.assertFalse(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'text/bar'})
self.assertTrue(mware.can_encode(resp))
def test_can_encode_blacklist(self):
mware = self._make_mware({
'content_type_blacklist': ['text/foo', 'text/bar'],
})
self.assertEqual([], mware.conf.get('content_type_whitelist'))
self.assertEqual(['text/foo', 'text/bar'],
mware.conf.get('content_type_blacklist'))
resp = swob.Response(body=b'a' * 11)
self.assertTrue(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'text/foo'})
self.assertFalse(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'application/foo'})
self.assertTrue(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'text/bar'})
self.assertFalse(mware.can_encode(resp))
def test_can_encode_white_and_blacklist(self):
mware = self._make_mware({
'content_type_whitelist': ['application/foo', 'text/foo'],
'content_type_blacklist': ['text/foo', 'text/bar'],
})
self.assertEqual(['application/foo', 'text/foo'],
mware.conf.get('content_type_whitelist'))
self.assertEqual(['text/foo', 'text/bar'],
mware.conf.get('content_type_blacklist'))
resp = swob.Response(body=b'a' * 11)
self.assertFalse(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'text/foo'})
self.assertFalse(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'application/foo'})
self.assertTrue(mware.can_encode(resp))
resp = swob.Response(body=b'a' * 11,
headers={'content-type': 'text/bar'})
self.assertFalse(mware.can_encode(resp))
def test_preferred_content_encoding(self):
mware = self._make_mware({'use_transfer_encoding': False})
req = _fake_req()
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'te': 'gzip'})
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'cache-control': 'foo, no-transform, bar',
'accept-encoding': 'gzip'})
self.assertEqual('identity', mware.preferred_encoding(req))
for encoding in ('gzip', 'x-gzip', 'x-bzip2', 'deflate'):
req = _fake_req({'accept-encoding': 'foo,' + encoding})
self.assertEqual(encoding, mware.preferred_encoding(req))
req = _fake_req({'accept-encoding': encoding})
self.assertEqual(encoding, mware.preferred_encoding(req))
req = _fake_req({'accept-encoding': 'foo'})
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'accept-encoding': '*'})
self.assertEqual('gzip', mware.preferred_encoding(req))
req = _fake_req({'accept-encoding': '*, gzip; q=0'})
self.assertEqual('x-bzip2', mware.preferred_encoding(req))
req = _fake_req({'accept-encoding': '*, gzip; q=0, x-bzip2;q=0.1'})
self.assertEqual('deflate', mware.preferred_encoding(req))
acceptable = '*, gzip; q=0, x-bzip2;q=0.1 ,deflate ;q=0.5'
req = _fake_req({'accept-encoding': acceptable})
self.assertEqual('identity', mware.preferred_encoding(req))
acceptable = '*, gzip; q=0, x-bzip2;q=0, deflate;q=0'
req = _fake_req({'accept-encoding': acceptable})
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'accept-encoding': 'identity,gzip;q=0.5'})
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'accept-encoding': 'identity;q=0'})
self.assertEqual('identity', mware.preferred_encoding(req))
def test_preferred_transfer_encoding(self):
mware = self._make_mware({})
self.assertTrue(mware.conf.get('use_transfer_encoding'))
req = _fake_req()
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'accept-encoding': 'gzip'})
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'cache-control': 'foo, no-transform, bar',
'te': 'gzip'})
self.assertEqual('gzip', mware.preferred_encoding(req))
for encoding in ('gzip', 'x-gzip', 'x-bzip2', 'deflate'):
req = _fake_req({'te': 'foo,' + encoding})
self.assertEqual(encoding, mware.preferred_encoding(req))
req = _fake_req({'te': encoding})
self.assertEqual(encoding, mware.preferred_encoding(req))
req = _fake_req({'te': 'foo'})
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'te': '*'})
self.assertEqual('gzip', mware.preferred_encoding(req))
req = _fake_req({'te': '*, gzip; q=0'})
self.assertEqual('x-bzip2', mware.preferred_encoding(req))
req = _fake_req({'te': '*, gzip; q=0, x-bzip2;q=0'})
self.assertEqual('deflate', mware.preferred_encoding(req))
req = _fake_req({'te': '*, gzip; q=0, x-bzip2;q=0 ,deflate ;q=0'})
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'te': 'identity,gzip;q=0.5'})
self.assertEqual('identity', mware.preferred_encoding(req))
req = _fake_req({'te': 'identity;q=0'})
self.assertEqual('identity', mware.preferred_encoding(req))
@mock.patch('compression.maybe_decode')
def test_call_pass_through(self, mock_decode):
def fake_app(env, start_resp):
fake_app.calls.append((env, start_resp))
start_resp('299 FOO', [('Content-Type', 'text/plain')])
return [b'some', b'extra', b'data']
fake_app.calls = []
mware = self._make_mware({}, fake_app)
req = _fake_req()
mock_start_response = mock.Mock()
resp = mware(req.environ, mock_start_response)
self.assertEqual(1, len(mock_decode.mock_calls))
self.assertEqual(1, len(mock_start_response.mock_calls))
self.assertEqual(2, len(mock_start_response.mock_calls[0][1]))
self.assertEqual(0, len(mock_start_response.mock_calls[0][2]))
self.assertEqual('299 FOO', mock_start_response.mock_calls[0][1][0])
actual_headers = dict(mock_start_response.mock_calls[0][1][1])
self.assertIn('Content-Length', actual_headers)
self.assertEqual({
'Content-Type': 'text/plain',
'Content-Length': '13'
}, actual_headers)
self.assertEqual([b'some', b'extra', b'data'], resp)
@mock.patch('compression.maybe_decode')
def test_call_transfer_encoding(self, mock_decode):
def fake_app(env, start_resp):
fake_app.calls.append((env, start_resp))
start_resp('299 FOO', [('Content-Type', 'text/plain')])
return [b'some', b'extra', b'data']
fake_app.calls = []
mware = self._make_mware({}, fake_app)
req = _fake_req({'te': 'gzip'})
mock_start_response = mock.Mock()
resp = mware(req.environ, mock_start_response)
self.assertEqual(1, len(mock_decode.mock_calls))
self.assertEqual(1, len(mock_start_response.mock_calls))
self.assertEqual(2, len(mock_start_response.mock_calls[0][1]))
self.assertEqual(0, len(mock_start_response.mock_calls[0][2]))
self.assertEqual('299 FOO', mock_start_response.mock_calls[0][1][0])
actual_headers = dict(mock_start_response.mock_calls[0][1][1])
self.assertNotIn('Content-Length', actual_headers)
self.assertEqual({
'Content-Type': 'text/plain',
'Transfer-Encoding': 'gzip',
}, actual_headers)
for i, chunk in enumerate(resp):
if not chunk:
self.fail('Got empty chunk at position %d' % i)
@mock.patch('compression.maybe_decode')
def test_call_content_encoding_buffered_gzip(self, mock_decode):
def fake_app(env, start_resp):
fake_app.calls.append((env, start_resp))
start_resp('499 BAR', [('Content-Type', 'text/plain')])
return [b'some', b'extra', b'data']
fake_app.calls = []
mware = self._make_mware({'use_transfer_encoding': False}, fake_app)
req = _fake_req({'content-encoding': 'x-gzip',
'accept-encoding': 'x-gzip'})
mock_start_response = mock.Mock()
body = mware(req.environ, mock_start_response)
self.assertEqual(1, len(mock_decode.mock_calls))
self.assertEqual(1, len(mock_start_response.mock_calls))
self.assertEqual(2, len(mock_start_response.mock_calls[0][1]))
self.assertEqual(0, len(mock_start_response.mock_calls[0][2]))
self.assertEqual('499 BAR', mock_start_response.mock_calls[0][1][0])
actual_headers = dict(mock_start_response.mock_calls[0][1][1])
self.assertIn('Content-Length', actual_headers)
body = b''.join(body)
self.assertEqual(actual_headers.pop('Content-Length'), str(len(body)))
self.assertEqual({
'Content-Type': 'text/plain',
'Content-Encoding': 'x-gzip',
'Vary': 'Accept-Encoding',
'Warning': '214 proxy-server:compression '
'"Content-Encoding modified"',
}, actual_headers)
self.assertEqual(b'someextradata', zlib.decompress(body, 31))
@mock.patch('compression.maybe_decode')
def test_call_content_encoding_buffered_deflate(self, mock_decode):
def fake_app(env, start_resp):
fake_app.calls.append((env, start_resp))
start_resp('499 BAR', [('Content-Type', 'text/plain')])
return [b'some', b'extra', b'data']
fake_app.calls = []
mware = self._make_mware({'use_transfer_encoding': False}, fake_app)
req = _fake_req({'content-encoding': 'deflate',
'accept-encoding': 'deflate'})
mock_start_response = mock.Mock()
body = mware(req.environ, mock_start_response)
self.assertEqual(1, len(mock_decode.mock_calls))
self.assertEqual(1, len(mock_start_response.mock_calls))
self.assertEqual(2, len(mock_start_response.mock_calls[0][1]))
self.assertEqual(0, len(mock_start_response.mock_calls[0][2]))
self.assertEqual('499 BAR', mock_start_response.mock_calls[0][1][0])
actual_headers = dict(mock_start_response.mock_calls[0][1][1])
self.assertIn('Content-Length', actual_headers)
body = b''.join(body)
self.assertEqual(actual_headers.pop('Content-Length'), str(len(body)))
self.assertEqual({
'Content-Type': 'text/plain',
'Content-Encoding': 'deflate',
'Vary': 'Accept-Encoding',
'Warning': '214 proxy-server:compression '
'"Content-Encoding modified"',
}, actual_headers)
self.assertEqual(b'someextradata', zlib.decompress(body, 15))
@mock.patch('compression.maybe_decode')
def test_call_content_encoding_chunked(self, mock_decode):
def fake_app(env, start_resp):
fake_app.calls.append((env, start_resp))
start_resp('599 BAZ', [('Content-Type', 'text/plain')])
return [b'lots', b'and'] * 10 + [
b'lots', b'of', b'extra', b'data']
fake_app.calls = []
mware = self._make_mware({'use_transfer_encoding': False}, fake_app)
req = _fake_req({'content-encoding': 'gzip',
'accept-encoding': 'gzip'})
mock_start_response = mock.Mock()
mware(req.environ, mock_start_response)
self.assertEqual(1, len(mock_decode.mock_calls))
self.assertEqual(1, len(mock_start_response.mock_calls))
self.assertEqual(2, len(mock_start_response.mock_calls[0][1]))
self.assertEqual(0, len(mock_start_response.mock_calls[0][2]))
self.assertEqual('599 BAZ', mock_start_response.mock_calls[0][1][0])
actual_headers = dict(mock_start_response.mock_calls[0][1][1])
self.assertNotIn('Content-Length', actual_headers)
self.assertEqual({
'Content-Type': 'text/plain',
'Content-Encoding': 'gzip',
'Vary': 'Accept-Encoding',
'Warning': '214 proxy-server:compression '
'"Content-Encoding modified"',
}, actual_headers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment