Skip to content

Instantly share code, notes, and snippets.

@samsonjs
Created December 28, 2021 21:52
Show Gist options
  • Save samsonjs/455e59fd75b2783071cc2215c3b3e3e1 to your computer and use it in GitHub Desktop.
Save samsonjs/455e59fd75b2783071cc2215c3b3e3e1 to your computer and use it in GitHub Desktop.
Python 3.9's mailbox.mbox class patched to properly handle body lines that start with "From " in multipart messages
class mbox(_mboxMMDF):
"""A classic mbox mailbox."""
_mangle_from_ = True
# All messages must end in a newline character, and
# _post_message_hooks outputs an empty line between messages.
_append_newline = True
def __init__(self, path, factory=None, create=True):
"""Initialize an mbox mailbox."""
self._message_factory = mboxMessage
_mboxMMDF.__init__(self, path, factory, create)
def _post_message_hook(self, f):
"""Called after writing each message to file f."""
f.write(linesep)
def _generate_toc(self):
"""Generate key-to-(start, stop) table of contents."""
starts, stops = [], []
last_was_empty = False
multipart_boundaries = [] # multipart segments can be nested so we have a stack of boundaries
self._file.seek(0)
while True:
line_pos = self._file.tell()
line = self._file.readline()
if line.startswith(b'From ') and not multipart_boundaries:
if len(stops) < len(starts):
if last_was_empty:
stops.append(line_pos - len(linesep))
else:
# The last line before the "From " line wasn't
# blank, but we consider it a start of a
# message anyway.
stops.append(line_pos)
starts.append(line_pos)
last_was_empty = False
elif not line:
if last_was_empty:
stops.append(line_pos - len(linesep))
else:
stops.append(line_pos)
break
elif line == linesep:
last_was_empty = True
else:
last_was_empty = False
# Check for new boundaries and push them on the stack right. We assume that the actual boundary line
# will show up and don't explicitly check for it.
m = re.search(r'boundary="([^"]+)"', line.decode('utf-8', 'replace'))
if m:
multipart_boundaries.append(m.group(1))
elif multipart_boundaries and line == ('--%s--\n' % (multipart_boundaries[-1])).encode('utf-8'):
multipart_boundaries.pop()
self._toc = dict(enumerate(zip(starts, stops)))
self._next_key = len(self._toc)
self._file_length = self._file.tell()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment