Skip to content

Instantly share code, notes, and snippets.

@adiroiban
Last active October 21, 2020 13:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adiroiban/7f593d6d18113aae797ad081e07f4745 to your computer and use it in GitHub Desktop.
Save adiroiban/7f593d6d18113aae797ad081e07f4745 to your computer and use it in GitHub Desktop.
Twisted support for parsing multipart/form-data POST requests
# Copyright (c) 2014 Adi Roiban.
# See LICENSE for details.
"""
Code for handling data submitted via HTTP form.
We should try to keep this code independent of Chevah project as we plan
to send it upstream in Twisted.
"""
from StringIO import StringIO
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IConsumer
from twisted.protocols.basic import LineReceiver
from werkzeug.http import parse_options_header
from zope.interface import implementer
class FormLengthExceededException(Exception):
"""
Marker exception when received form data exceeds length limits.
"""
@implementer(IConsumer)
class MultiPartFormData(LineReceiver, object):
"""
Accumulate multipart/form-data content delimited by `boundary` and call
`open_callback` for each part.
When `open_callback` return None, part content is stored in memory.
For non-None values part content is sent to the returned stream.
A single text line in the form request (excluding binary data) as well as
total length for a text only part should be smaller than MAX_LENGTH.
This is a partial implementation which only handles headers parting. Form
data is still parsed using old in-memory code.
This is why it also provides file object methods.
"""
_STATE_START = 'start'
_STATE_CONTENT = 'content_text'
_STATE_HEADER = 'header'
_STATE_END = 'end'
#: State inside form part.
_state = _STATE_START
#: Buffer use to check boundary inside a streamed part.
_stream_boundary_buffer = b''
#: IUploadStream used for sending part content.
_stream = None
def __init__(self, boundary, open_callback, maximum_length=None):
self._start_boundary = b'--%s' % boundary
self._end_boundary = b'--%s--' % boundary
# In raw mode the boundary includes the starting new line delimiter.
self._raw_boundary = self.delimiter + self._start_boundary
if maximum_length is not None:
self.MAX_LENGTH = maximum_length
self._open_callback = open_callback
self._deferred = Deferred()
self._producer = None
self._resetPartState()
def registerProducer(self, producer, streaming=True):
"""
Signal that we are receiving data from a streamed request.
Only stream producer is supported.
"""
self._producer = producer
self._resetInternalState()
def unregisterProducer(self):
"""
Called when all data was received.
"""
self._producer = None
self.close()
@property
def deferred(self):
"""
Deferred which is called when form parsing is done.
Return `None` on success or a failure on errors.
"""
return self._deferred
def _resetInternalState(self):
"""
Does a reinitialization of global state.
"""
self._state = self._STATE_START
self._content = StringIO()
self._part = StringIO()
self._resetPartState()
def _resetPartState(self):
"""
Initialize state for parsing a new part.
"""
self._headers = {}
self._headers_raw = []
# If file was not closed yet, then something went wrong and we use
# a special method on the stream.
if self._stream and not self._stream.closed:
self._stream.interrupt(Exception(
'Unexpected end of form part.'))
self._stream = None
def write(self, data):
"""
Called by transport when content is received.
Translates into dataReceived, the expected entry method for a Protocol.
"""
self.dataReceived(data)
def connectionMade(self):
"""
See: Protocol.
"""
self._resetInternalState()
def connectionLost(self, reason=None):
"""
See: Protocol.
"""
self._resetInternalState()
# Deferred might be already called in case of errors.
if not self._deferred.called:
self._deferred.callback(None)
def lineReceived(self, line):
"""
See: LineReceiver.
Dispatch form_STATE method.
"""
state_method = getattr(self, '_form_%s' % self._state)
state_method(line)
def rawDataReceived(self, data):
"""
See: LineReceiver.
"""
found, raw, text = self._checkBoundary(data)
if raw:
self._stream.write(raw)
if found:
self._stream.close()
self.setLineMode(extra=text)
def sendLine(self, line):
"""
See: LineReceiver.
"""
raise RuntimeError(u'This is a read-only protocol.')
def lineLengthExceeded(self, line):
"""
See: LineReceiver.
"""
self._deferred.errback(
FormLengthExceededException(u'Form line too big.'))
def _checkBoundary(self, data):
"""
Look for boundary in raw data.
Return a tuple with (found, raw, text).
`found` is True if boundary was reached.
`raw` is raw part from data.
`text` is plain text part after raw data, including boundary.
"""
found = False
raw = b''
text = b''
search_buffer = self._stream_boundary_buffer + data
boundary_position = search_buffer.find(self._raw_boundary)
if boundary_position > -1:
found = True
raw = search_buffer[:boundary_position]
text = search_buffer[boundary_position:]
else:
# Not found.
if len(search_buffer) <= len(self._raw_boundary):
# Search buffer is still not big enough.
# No new raw data is produced.
self._stream_boundary_buffer = search_buffer
else:
# Keep buffer to minimum size.
buffer_start = len(search_buffer) - len(self._raw_boundary) + 1
raw = search_buffer[:buffer_start]
self._stream_boundary_buffer = search_buffer[buffer_start:]
return (found, raw, text)
def _form_start(self, line):
"""
State before any boundary was received
It can only change into `header` state.
"""
# Ignore all data until we get the start marker.
if line != self._start_boundary:
return
self._updateLine(line)
self._state = self._STATE_HEADER
def _form_end(self, line):
"""
State after end boundary was received.
"""
# Ignore data.
pass
def _form_header(self, line):
"""
State while receiving header content.
It should normally change into `content` state, but on errors
is reset into `start` state.
"""
self._updateLine(line)
if not line:
# Header ends / content starts with an empty line.
self._initializeContent()
elif line == self._start_boundary:
# A part without end and content.
self._finalizePart()
elif line == self._end_boundary:
# A part without content.
# Write it as it is.
self._finalizePart(end=True)
else:
update_headers(self._headers, line)
self._headers_raw.append(line)
def _initializeContent(self):
"""
Prepare state for incoming part content.
"""
self._state = self._STATE_CONTENT
raw_headers = (
self.delimiter.join(self._headers_raw) +
self.delimiter +
self.delimiter)
self._stream = self._open_callback(self._headers, raw_headers)
if self._stream:
self.setRawMode()
self._stream_boundary_buffer = b''
def _form_content_text(self, line):
"""
State while receiving text content.
Raw content is received in rawDataReceived().
It can only go into `start` state.
"""
self._updateLine(line)
if line == self._end_boundary:
self._finalizePart(end=True)
elif line == self._start_boundary:
# We got a start marker before end marker.
self._finalizePart()
else:
# Continue reading content.
pass
def _updateLine(self, line):
"""
Update part with line content.
"""
if (self._part.len + len(line)) > self.MAX_LENGTH:
self._deferred.errback(
FormLengthExceededException(u'Form part too big.'))
return
self._part.write(line + self.delimiter)
def _finalizePart(self, end=False):
"""
Write current parsed part.
"""
self._content.write(self._part.getvalue())
self._part = StringIO()
if end:
self._state = self._STATE_END
else:
self._state = self._STATE_HEADER
self._resetPartState()
def seek(self, position, semantic):
"""
Proxy to internal StringIO.
"""
return self._content.seek(position, semantic)
def read(self):
"""
Proxy to internal StringIO.
"""
return self._content.read()
def readline(self, limit=None):
"""
Proxy to internal StringIO.
"""
return self._content.readline(limit)
def close(self):
"""
Clean up internal state.
"""
self.connectionLost()
def update_headers(headers, line):
"""
Update `headers` dict based on raw line.
header_name is lowercase.
header_content is empty when header line is invalid and does not
contain a delimiter.
"""
name = ''
content = ''
parts = line.split(':', 1)
name = parts[0].strip().lower()
if len(parts) == 2:
content = parts[1]
value, options = parse_options_header(content)
headers[name] = {'value': value, 'options': options}
class RESTFolder(ChevahResource):
"""
REST Resource for listing folder content.
"""
def __init__(self, avatar):
self._avatar = avatar
self._segments = []
self._uploaded_files = []
super(RESTFolder, self).__init__()
def headersReceived(self, request):
"""
See: `IChevahResource`.
"""
if request.method != 'POST':
# Use default content handler and request size.
return _CONTINUE
content_type, _ = request.getContentType()
if content_type == TYPE_NAME.APPLICATION_JSON:
# For JSON we accept default limits.
return _CONTINUE
# For now only multipart/form-data format is supported in POST.
boundary = request.getContentBoundary()
if not boundary:
message = u'Request does not contain a form boundary.'
self._addAlert(
request=request,
type=u'error',
message=message,
prevent_duplicates=True,
)
return (http.BAD_REQUEST, message)
self._setPOSTMultipartFormHandler(request, boundary)
# We accept request of any size. Limit are enforced later by
# request and form handler.
return _CONTINUE
def _setPOSTMultipartFormHandler(self, request, boundary):
"""
Configure request content to use streamed form handler.
"""
# FIXME:2090:
# Implement a factory to avoid passing the request in this way.
self._request = request
request.content = MultiPartFormData(
boundary=boundary, open_callback=self._cbOpenPart)
request.content.makeConnection(request.transport)
request.content.deferred.addErrback(self._ebUploadForm, request)
# POST Upload request has no limit.
# Some limits are imposed by form parser.
request.maximumBodyLength = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment