Skip to content

Instantly share code, notes, and snippets.

@mitsuhiko
Last active October 29, 2020 10:26
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mitsuhiko/5721547 to your computer and use it in GitHub Desktop.
Save mitsuhiko/5721547 to your computer and use it in GitHub Desktop.

wsgi.input_terminated Proposal

Problem Description

Currently WSGI servers or WSGI middlewares cannot accept chunked requests or perform request filtering because a WSGI application needs to depend on the content length as this is the only thing that the specification currently guarantees will work.

Essentially this means that each WSGI setup is currently in violation with HTTP/1.1 which requires chunked requests to be supported.

Different WSGI servers have different solutions for this problem, so this specification proposes two pre-defined setups that will support chunked requests. One is the safe solution that is the de-facto of what PEP 333 and PEP 3333 provide, the other one is a new usage mode that is the de-facto implementation that is already available in most WSGI servers to date.

Proposal

A two step proposal to fix the situation with different behaviors of input streams in WSGI.

WSGI servers have two options to providing the input stream:

  1. Provide wsgi.input as socket file unchanged. This means that wsgi.input_terminated is set to False or not added to the WSGI environ at all. In that case the WSGI application is required to look at the CONTENT_LENGTH and only read up to that point. Applications in that case are required to count up the bytes received and comparing it against the content length. If less than CONTENT_LENGTH is received the application must respond to this in order to not accidentally assume a regular end of the stream has been reached.
  2. Provide wsgi.input as an end-of-file terminated stream. In that case wsgi.input_terminated is set to True and an app is required to read to the end of the file and disregard CONTENT_LENGTH for reading.

    In addition to that the server is also required to raise an IOError if the client disconnects without transmitting all the data as it would otherwise not be possible for the client to reliably detect a disconnect.

    This is already what many WSGI servers do, but for an application there has not been a way to detect that behavior yet.

Pseudocode for a WSGI implementation:

def get_input_stream(environ):
    stream = environ['wsgi.input']

    # This part is new
    if environ.get('wsgi.input_terminated'):
        return stream

    # This part was necessary before anyways to not accidentally
    # read past the length of the stream.
    return wrap_stream(environ['wsgi.input'],
                       environ['CONTENT_LENGTH'])

The only thing that needs to be changed in the WSGI server is either nothing (for instance wsgiref or any other simple WSGI server that just puts the socket through does nothing) or a server like mod_wsgi or gunicorn that terminate the input stream set the flag wsgi.input_terminated to True when making the WSGI environ.

FAQ

Some common questions that came up doing this propsal.

I never wrapped my input streams before, why/how do I do that?

This is actually quite tricky due to a bunch of ugly warts about the stream system in Python. There is not much you can do about that than carefully wrapping the input stream. This implementation is similar to what Werkzeug does. A lot of WSGI implementations currently do not properly wrap the input stream and already assume that wsgi.input_terminated is actually set.

In addition to just wrapping the stream for not reading past the content length it's also crucial to detect disconnected clients which is marked by less data then the content lenght being received.

Here is how Werkzeug wraps the stream:

def wrap_stream(input_stream, content_length):
    return LimitedStream(input_stream, max(0, int(content_length or 0)))


class LimitedStream(object):

    def __init__(self, stream, limit):
        self._read = stream.read
        self._readline = stream.readline
        self._pos = 0
        self._limit = limit

    def read(self, size=None):
        if self._pos >= self._limit:
            return b''
        if size is None or size < 0:
            size = self._limit
        to_read = min(self._limit - self._pos, size)
        read = self._read(to_read)
        if to_read and len(read) != to_read:
            raise IOError('The client went away')
        self._pos += len(read)
        return read

    def readline(self, size=None):
        if self._pos >= self._limit:
            return b''
        if size is None:
            size = self._limit - self._pos
        else:
            size = min(size, self._limit - self._pos)
        line = self._readline(size)
        if size and not line:
            raise IOError('The client went away')
        self._pos += len(line)
        return line

    def readlines(self, size=None):
        result = []
        if size is not None:
            end = min(self._limit, last_pos + size)
        else:
            end = self._limit
        while 1:
            if size is not None:
                size -= last_pos - self._pos
            if self._pos >= end:
                break
            result.append(self.readline(size))
            if size is not None:
                last_pos = self._pos
        return result

    def __next__(self):
        line = self.readline()
        if not line:
            raise StopIteration()
        return line

    def __iter__(self):
        return self

    # Python 2 support
    next = __next__

How do I use this in a middleware?

Generally the answer is you don't. Filterings should be done on the webserver and that's the whole point of this proposal as it makes filtering on the server possible.

However it does now also allow you to do it in a middleware if you absolutely need to do that. This can for instance be useful if you need to do demasking or custom decryption on the input data stream.

Here is an example of how you can implement unzipping in a middleware:

import zlib

class UnzippingStream(object):

    def __init__(self, stream):
        self._stream = stream
        self._decomp = zlib.decompressobj()
        self._buf_out = bytearray()
        self._hit_eof = False

    def _feed(self, read_all=False):
        if self._hit_eof:
            return True
        if read_all:
            input_data = self._stream.read()
        else:
            input_data = self._stream.read(2048)
        new_in = self._decomp.unconsumed_tail + input_data
        self._buf_out += self._decomp.decompress(bytes(new_in))
        if not input_data:
            self._buf_out += self._decomp.flush()

        self._hit_eof = not input_data
        return self._hit_eof

    def _fetch(self, length=None):
        rv = self._buf_out[:length]
        del self._buf_out[:length]
        return bytes(rv)

    def read(self, size=None):
        if size is None:
            self._feed(True)

        was_empty = False
        while 1:
            if size is None:
                return self._fetch()
            elif was_empty or len(self._buf_out) >= size:
                return self._fetch(size)
            was_empty = not self._feed()

    def readline(self, size=None):
        while 1:
            was_empty = self._feed()
            rv = self._buf_out.splitlines(True)
            if rv and (was_empty or
                       rv[0].endswith(b'\r\n') or \
                       len(rv) > 1 and rv[0].endswith((b'\r', b'\n'))):
                return self._fetch(len(rv[0]))
            if was_empty:
                return b''

    def readlines(self):
        result = []
        while 1:
            rv = self.readline()
            if not rv:
                break
            result.append(rv)
        return rv


    class UnzippingMiddleware(object):

        def __init__(self, app):
            self.app = app

        def __call__(self, environ, start_response):
            # Fetch the input stream.  See function definition above for
            # more information.
            input_stream = get_input_stream(environ)

            # Wrap the stream.  Since this changes the content length we
            # now need to inform the application that the input is
            # terminated and the content length becomes hit only.
            wsgi['wsgi.input'] = UnzippingStream(input_stream)
            wsgi['wsgi.input_terminated'] = True

            # Invoke the application.
            return self.app(environ, start_response)

Why not a separate stream?

Why would you want a boolean marker instead of a separate stream object? Primarily because it keeps the WSGI environment nice and tidy and middlewares just need to flip the input terminated flag when they wrap it. It means the proposal is fully backwards compatible and that flag just indicates the edge cases. Input stream handling is areadly pretty complicated for both server and client and this proposal is just fixing the edge cases more consistently.

Hopefully going into the future most WSGI servers are going to set the flag to True as this removes complexity from the WSGI client application. Until that however is the case clients will need to support the worst case which is that flag not being set in which case they are required to work within the vague wording of the original specification.

In addition to that a boolean flag can be injected very easily from the outside, so mod_wsgi for instance already can support this proposal by just setting it from the apache config.

What if my middleware depends on CONTENT_LENGTH?

This proposal adds new functionality: namely that a server can finally accept chunked requests and do request content filtering. Your middleware will need to be fixed to support that. That said, this proposal is not removing functionality. Previously if you sent a chunked request to a WSGI server it would just have timed out most likely or failed in some other form. If your middleware continues to rely on a content length that might be incorrect then the same behavior is restored: your application will not respond properly to this request.

Yes: such middlewares will indeed need to now honor the termination flag, but on the other hand this is not a regression or a new problem. If anything this just means that unless your whole stack upgrades you won't benefit from this new flag.

@mcdonc
Copy link

mcdonc commented Jun 6, 2013

A few issues that I see:

Provide wsgi.input as an end-of-file terminated stream. In that case wsgi.input_terminated is set to True and an app is required to read to the end of the file and disregard CONTENT_LENGTH for reading.

"Disregard" seems inaccurate given that the following paragraph requires clients to consider CONTENT_LENGTH:

Applications in that case are required to count up the bytes received and comparing it against the content length. If less than CONTENT_LENGTH is received the application must respond to this in order to not accidentally assume a regular end of the stream has been reached.

What does "respond to this" mean above?

Additionally, existing middlewares which wrap wsgi.input won't play nicely downstream of servers/middleware which sets wsgi.input_terminated . They will not check for for wsgi.input_terminated and they will presume that CONTENT_LENGTH is reliable. This might be fine, but it's not entirely bw compatible as claimed in the last paragraph.

@mcdonc
Copy link

mcdonc commented Jun 6, 2013

Concrete example of existing middleware that probably won't play nicely if CONTENT_LENGTH is unreliable:

https://github.com/repoze/repoze.retry/blob/master/repoze/retry/__init__.py#L50

@mitsuhiko
Copy link
Author

This proposal does not change the reliability of the content length. The content length only becomes unrealiable under filtering situations which already made content length unavailable. With this proposal the middleware finally has the chance to do filtering and not break. If you would run that middleware on apache with gzip in the front it would already have failed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment