Created
January 28, 2011 20:11
-
-
Save MetalBeetle/800859 to your computer and use it in GitHub Desktop.
_parse_stream is a generator that produces (name, data, header-dict) tuples. Needs a bit of wrapping to get the stream from the environment, but otherwise done. Doesn't really do any error handling yet.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Parser for multipart/form-data.""" | |
import re | |
import tempfile | |
import os | |
class RequestFile: | |
""" | |
An uploaded file. | |
This contains three instance variables: | |
- original_name: The original filename of the uploaded file (basename only, no directories). | |
- temp_path: The path to the temporary file on disk. | |
- size: The size of the file, in bytes. | |
""" | |
__slots__ = ("original_name", "temp_path", "size",) | |
def __init__(self, original_name, temp_path, size): | |
"""Initializes the RequestFile.""" | |
self.original_name = original_name | |
self.temp_path = temp_path | |
self.size = size | |
def find_newline(data, start, end): | |
""" | |
Finds a new line sequence (CRLF or LF). Does not find sole CRs as they are both very wrong and | |
impossible to distinguish from the first half of a CRLF. | |
Returns a tuple of the position of the CRLF/LF and its length. | |
""" | |
crlf = data.find(b'\r\n', start, end) | |
lf = data.find(b'\n', start, end) | |
if crlf == -1: | |
return (lf, 1) | |
else: | |
if crlf < lf: | |
return (crlf, 2) | |
else: | |
return (lf, 1) | |
class FilePartHandler: | |
def __init__(self): | |
self.request_file = None | |
self.request_file_handle = None | |
def accept(self, headers, upload_dir): | |
if "filename" in headers["Content-Disposition"]: | |
filename = headers["Content-Disposition"]["filename"] | |
fd, path = tempfile.mkstemp(suffix=filename, dir=upload_dir) | |
self.request_file_handle = os.fdopen(fd, 'w+b') | |
self.request_file = RequestFile(filename, path, 0) | |
return True | |
return False | |
def process_data(self, data, start, end): | |
# Use a memory view to prevent copying of data when slicing. | |
self.request_file_handle.write(memoryview(data)[start:end]) | |
self.request_file.size += end - start | |
def finish(self): | |
self.request_file_handle.close() | |
return self.request_file | |
def cancel(self): | |
self.request_file_handle.close() | |
os.remove(request_file.temp_path) | |
class FieldPartHandler: | |
def __init__(self): | |
self.buffer = None | |
def accept(self, headers, upload_dir): | |
# Fallback | |
return True | |
def process_data(self, data, start, end): | |
if self.buffer: | |
self.buffer += data[start:end] | |
else: | |
self.buffer = data[start:end] | |
def finish(self): | |
return self.buffer.decode() | |
class _Headers: | |
def __init__(self, max_header_line_length): | |
self.headers = {} | |
self.buffer = b'' | |
self.header_lines_read = 0 | |
self.max_header_line_length = max_header_line_length | |
self.newline_type = 2 | |
def newline(self): | |
return (b'', b'\n', b'\r\n')[self.newline_type] | |
def process_data(self, data, start, end): | |
""" | |
Processes data into header information. Returns None if more data is required, or an | |
offset for the start of the non-consumed data if the headers are done. | |
""" | |
while True: | |
nl, self.newline_type = find_newline(data, start, end) | |
if nl == -1: | |
# We don't have enough information to construct a complete header line. | |
if len(self.buffer) + end - start > self.max_header_line_length: | |
raise Exception("Maximum multipart header line length {0} exceeded.".format( | |
self.max_header_line_length)) | |
if len(self.buffer) > 0: | |
self.buffer += data[start:end] | |
else: | |
self.buffer = data[start:end] | |
return None | |
else: | |
# We have a complete header line. | |
if len(self.buffer) + nl - start > self.max_header_line_length: | |
raise Exception("Maximum multipart header line length {0} exceeded.".format( | |
self.max_header_line_length)) | |
if len(self.buffer) > 0: | |
header_line = (self.buffer + data[start:nl]).decode() | |
self.buffer = b'' | |
else: | |
header_line = data[start:nl].decode() | |
self.header_lines_read += 1 | |
start = nl + self.newline_type | |
if len(header_line) == 0: | |
if self.header_lines_read == 1: | |
# A blank line at the start. Ignore. | |
pass | |
else: | |
# End of headers. Choose a handler based on them. | |
return start | |
else: | |
self.parse_header_line(header_line) | |
def parse_header_line(self, header_line): | |
""" | |
Parses a multipart header line and returns the result as a tuple of the header name and its | |
arguments. For example, the header line | |
Content-Disposition: form-data; name="image"; filename="botg.txt" | |
becomes | |
('Content-Disposition', {'':'form-data', 'name':'image', 'filename':'botg.txt'}) | |
""" | |
name_m = re.match(" *([^ :]*) *: *([^ ;]*)", header_line) | |
if name_m == None: | |
return | |
line_name = name_m.group(1) | |
line_info = {"":name_m.group(2)} | |
args = header_line.split(";")[1:] | |
for arg in args: | |
arg_m = re.match(" *([^ =]*) *= *\"?([^ \"]*)\"?", arg) | |
if arg_m: | |
line_info[arg_m.group(1)] = arg_m.group(2) | |
self.headers[line_name] = line_info | |
class _Part: | |
def __init__(self, upload_dir, max_header_line_length, custom_handlers=()): | |
self.upload_dir = upload_dir | |
self.headers = _Headers(max_header_line_length) | |
self.handler = None | |
self.available_handlers = custom_handlers + (FilePartHandler, FieldPartHandler) | |
def process_data(self, data, start, end): | |
if self.handler: | |
self.handler.process_data(data, start, end) | |
else: | |
rest_of_data_start = self.headers.process_data(data, start, end) | |
if rest_of_data_start != None: | |
for h in self.available_handlers: | |
instance = h() | |
if instance.accept(self.headers.headers, self.upload_dir): | |
self.handler = instance | |
break | |
# Process the rest of the data into the handler. | |
self.handler.process_data(data, rest_of_data_start, end) | |
def result(self): | |
return (self.headers.headers["Content-Disposition"]["name"], self.handler.finish(), | |
self.headers.headers) | |
def cancel(self): | |
if self.handler and hasattr(self.handler, "cancel"): | |
self.handler.cancel() | |
def _parse_stream(stream, upload_dir="/tmp", block_size=32768, max_header_line_length=1024, custom_handlers=()): | |
""" | |
Parses the stream, yielding tuples of (name, value, header-dict). | |
""" | |
# Parse the main headers to figure out what the boundary is. | |
headers = _Headers(max_header_line_length) | |
rest_of_data_start = None | |
block = None | |
while rest_of_data_start == None: | |
block = stream.read(block_size) | |
if len(block) == 0: | |
# The stream is empty. | |
return | |
rest_of_data_start = headers.process_data(block, 0, len(block)) | |
# Now we can determine what the boundary is. | |
boundary = headers.newline() + b'--' + headers.headers["Content-Type"]["boundary"].encode() | |
boundary_length = len(boundary) | |
# Start consuming the rest of the stream, dividing it into parts along the boundaries. | |
part_start = rest_of_data_start | |
part_start += boundary_length | |
# If the boundary between the headers and the first part lies across a block, we first need to | |
# read in a new block. | |
if part_start >= len(block): | |
part_start -= len(block) | |
block = stream.read(block_size) | |
part = _Part(upload_dir, max_header_line_length, custom_handlers) | |
search_start = part_start | |
part_end = -1 | |
while True: | |
part_end = block.find(boundary, search_start) | |
if part_end != -1: | |
# We've found the end of a part. | |
part.process_data(block, part_start, part_end) | |
yield part.result() | |
part = _Part(upload_dir, max_header_line_length, custom_handlers) | |
part_start = part_end + boundary_length | |
search_start = part_start | |
else: | |
# We can't find a boundary marker in this block, so it will either be entirely in the | |
# next block or span the block boundary. Or we've run out of stream. | |
next_block = stream.read(block_size) | |
if len(next_block) == 0: | |
# We're out of stream. | |
part.cancel() | |
return | |
# Now we need to see where part_end is, which is complicated by the fact that the | |
# boundary marker could be spanning the block boundaries. So let's check for that case: | |
part_end = -1 | |
for offset in range(1, boundary_length): | |
if block.endswith(boundary[:offset]) and next_block.startswith(boundary[offset:]): | |
part_end = len(block) - offset | |
break | |
if part_end == -1: | |
# The entire rest of the block is for the same part and does not contain any | |
# boundary marker bits. | |
part.process_data(block, part_start, len(block)) | |
part_start = 0 | |
search_start = 0 | |
block = next_block | |
else: | |
# We found the boundary marker spanning the two blocks. | |
part.process_data(block, part_start, part_end) | |
yield part.result() | |
part = _Part(upload_dir, max_header_line_length, custom_handlers) | |
part_start = part_end - len(block) + boundary_length | |
search_start = part_start | |
block = next_block |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment