Skip to content

Instantly share code, notes, and snippets.

@MetalBeetle
Created January 28, 2011 20:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MetalBeetle/800859 to your computer and use it in GitHub Desktop.
Save MetalBeetle/800859 to your computer and use it in GitHub Desktop.
_parse_stream is a generator that produces (name, data, header-dict) tuples. Needs a bit of wrapping to get the stream from the environment, but otherwise done. Doesn't really do any error handling yet.
"""Parser for multipart/form-data."""
import re
import tempfile
import os
class RequestFile:
"""
An uploaded file.
This contains three instance variables:
- original_name: The original filename of the uploaded file (basename only, no directories).
- temp_path: The path to the temporary file on disk.
- size: The size of the file, in bytes.
"""
__slots__ = ("original_name", "temp_path", "size",)
def __init__(self, original_name, temp_path, size):
"""Initializes the RequestFile."""
self.original_name = original_name
self.temp_path = temp_path
self.size = size
def find_newline(data, start, end):
"""
Finds a new line sequence (CRLF or LF). Does not find sole CRs as they are both very wrong and
impossible to distinguish from the first half of a CRLF.
Returns a tuple of the position of the CRLF/LF and its length.
"""
crlf = data.find(b'\r\n', start, end)
lf = data.find(b'\n', start, end)
if crlf == -1:
return (lf, 1)
else:
if crlf < lf:
return (crlf, 2)
else:
return (lf, 1)
class FilePartHandler:
def __init__(self):
self.request_file = None
self.request_file_handle = None
def accept(self, headers, upload_dir):
if "filename" in headers["Content-Disposition"]:
filename = headers["Content-Disposition"]["filename"]
fd, path = tempfile.mkstemp(suffix=filename, dir=upload_dir)
self.request_file_handle = os.fdopen(fd, 'w+b')
self.request_file = RequestFile(filename, path, 0)
return True
return False
def process_data(self, data, start, end):
# Use a memory view to prevent copying of data when slicing.
self.request_file_handle.write(memoryview(data)[start:end])
self.request_file.size += end - start
def finish(self):
self.request_file_handle.close()
return self.request_file
def cancel(self):
self.request_file_handle.close()
os.remove(request_file.temp_path)
class FieldPartHandler:
def __init__(self):
self.buffer = None
def accept(self, headers, upload_dir):
# Fallback
return True
def process_data(self, data, start, end):
if self.buffer:
self.buffer += data[start:end]
else:
self.buffer = data[start:end]
def finish(self):
return self.buffer.decode()
class _Headers:
def __init__(self, max_header_line_length):
self.headers = {}
self.buffer = b''
self.header_lines_read = 0
self.max_header_line_length = max_header_line_length
self.newline_type = 2
def newline(self):
return (b'', b'\n', b'\r\n')[self.newline_type]
def process_data(self, data, start, end):
"""
Processes data into header information. Returns None if more data is required, or an
offset for the start of the non-consumed data if the headers are done.
"""
while True:
nl, self.newline_type = find_newline(data, start, end)
if nl == -1:
# We don't have enough information to construct a complete header line.
if len(self.buffer) + end - start > self.max_header_line_length:
raise Exception("Maximum multipart header line length {0} exceeded.".format(
self.max_header_line_length))
if len(self.buffer) > 0:
self.buffer += data[start:end]
else:
self.buffer = data[start:end]
return None
else:
# We have a complete header line.
if len(self.buffer) + nl - start > self.max_header_line_length:
raise Exception("Maximum multipart header line length {0} exceeded.".format(
self.max_header_line_length))
if len(self.buffer) > 0:
header_line = (self.buffer + data[start:nl]).decode()
self.buffer = b''
else:
header_line = data[start:nl].decode()
self.header_lines_read += 1
start = nl + self.newline_type
if len(header_line) == 0:
if self.header_lines_read == 1:
# A blank line at the start. Ignore.
pass
else:
# End of headers. Choose a handler based on them.
return start
else:
self.parse_header_line(header_line)
def parse_header_line(self, header_line):
"""
Parses a multipart header line and returns the result as a tuple of the header name and its
arguments. For example, the header line
Content-Disposition: form-data; name="image"; filename="botg.txt"
becomes
('Content-Disposition', {'':'form-data', 'name':'image', 'filename':'botg.txt'})
"""
name_m = re.match(" *([^ :]*) *: *([^ ;]*)", header_line)
if name_m == None:
return
line_name = name_m.group(1)
line_info = {"":name_m.group(2)}
args = header_line.split(";")[1:]
for arg in args:
arg_m = re.match(" *([^ =]*) *= *\"?([^ \"]*)\"?", arg)
if arg_m:
line_info[arg_m.group(1)] = arg_m.group(2)
self.headers[line_name] = line_info
class _Part:
def __init__(self, upload_dir, max_header_line_length, custom_handlers=()):
self.upload_dir = upload_dir
self.headers = _Headers(max_header_line_length)
self.handler = None
self.available_handlers = custom_handlers + (FilePartHandler, FieldPartHandler)
def process_data(self, data, start, end):
if self.handler:
self.handler.process_data(data, start, end)
else:
rest_of_data_start = self.headers.process_data(data, start, end)
if rest_of_data_start != None:
for h in self.available_handlers:
instance = h()
if instance.accept(self.headers.headers, self.upload_dir):
self.handler = instance
break
# Process the rest of the data into the handler.
self.handler.process_data(data, rest_of_data_start, end)
def result(self):
return (self.headers.headers["Content-Disposition"]["name"], self.handler.finish(),
self.headers.headers)
def cancel(self):
if self.handler and hasattr(self.handler, "cancel"):
self.handler.cancel()
def _parse_stream(stream, upload_dir="/tmp", block_size=32768, max_header_line_length=1024, custom_handlers=()):
"""
Parses the stream, yielding tuples of (name, value, header-dict).
"""
# Parse the main headers to figure out what the boundary is.
headers = _Headers(max_header_line_length)
rest_of_data_start = None
block = None
while rest_of_data_start == None:
block = stream.read(block_size)
if len(block) == 0:
# The stream is empty.
return
rest_of_data_start = headers.process_data(block, 0, len(block))
# Now we can determine what the boundary is.
boundary = headers.newline() + b'--' + headers.headers["Content-Type"]["boundary"].encode()
boundary_length = len(boundary)
# Start consuming the rest of the stream, dividing it into parts along the boundaries.
part_start = rest_of_data_start
part_start += boundary_length
# If the boundary between the headers and the first part lies across a block, we first need to
# read in a new block.
if part_start >= len(block):
part_start -= len(block)
block = stream.read(block_size)
part = _Part(upload_dir, max_header_line_length, custom_handlers)
search_start = part_start
part_end = -1
while True:
part_end = block.find(boundary, search_start)
if part_end != -1:
# We've found the end of a part.
part.process_data(block, part_start, part_end)
yield part.result()
part = _Part(upload_dir, max_header_line_length, custom_handlers)
part_start = part_end + boundary_length
search_start = part_start
else:
# We can't find a boundary marker in this block, so it will either be entirely in the
# next block or span the block boundary. Or we've run out of stream.
next_block = stream.read(block_size)
if len(next_block) == 0:
# We're out of stream.
part.cancel()
return
# Now we need to see where part_end is, which is complicated by the fact that the
# boundary marker could be spanning the block boundaries. So let's check for that case:
part_end = -1
for offset in range(1, boundary_length):
if block.endswith(boundary[:offset]) and next_block.startswith(boundary[offset:]):
part_end = len(block) - offset
break
if part_end == -1:
# The entire rest of the block is for the same part and does not contain any
# boundary marker bits.
part.process_data(block, part_start, len(block))
part_start = 0
search_start = 0
block = next_block
else:
# We found the boundary marker spanning the two blocks.
part.process_data(block, part_start, part_end)
yield part.result()
part = _Part(upload_dir, max_header_line_length, custom_handlers)
part_start = part_end - len(block) + boundary_length
search_start = part_start
block = next_block
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment