Created
December 30, 2016 02:13
-
-
Save ned14/f261bfda5e376959ab3588242df0a1ef to your computer and use it in GitHub Desktop.
Converts old, corrupted Pegasus Mail mail stores into portable mboxo or maildir stores suitable for import into almost any other mail client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Converts Pegasus Mail v4.x .PMM files into Unix mbox files using lots | |
# of heuristics to repair any corruption in the Pegasus mail store | |
# (the "Unix mbox" feature of Pegasus produces malformed Unix mbox files) | |
# (C) 2016 Niall Douglas http://www.nedprod.com/ | |
# File created: Dec 2016 | |
# | |
# Best used with python 3.6, anything older has a less able email and | |
# mailbox python modules | |
import sys, os, glob, mmap, re, email, email.policy, mailbox | |
maildir=False # makes maildirs instead of mboxes. | |
print("NOTE: Make SURE you have run 'Recover deleted space' on every Pegasus mailbox before running this program, otherwise deleted messages will be converted over!") | |
mailheaderre=b'[-A-Za-z0-9]+:' | |
outpolicy=email.policy.default.clone(linesep='\n', mangle_from_=True, max_line_length=998, refold_source='none') | |
inpolicy=email.policy.default.clone(linesep='\r\n', mangle_from_=True, max_line_length=998, refold_source='none', raise_on_defect=True) | |
inpolicy2=email.policy.default.clone(linesep='\r\n', mangle_from_=True, max_line_length=998, refold_source='none') | |
tocheck={} | |
for mailfile in glob.glob('c:\\PMAIL\\MAIL\\*.PMM'): | |
with open(mailfile, 'rt') as _ih: | |
msgs=[] | |
with mmap.mmap(_ih.fileno(), 0, access=mmap.ACCESS_READ) as ih: | |
mailfilename=ih.read(128) | |
mailfilename=mailfilename[:mailfilename.find(0)].decode('utf-8') | |
#if mailfilename!='General': | |
# continue | |
print('\n\nParsing file', mailfile, 'named', mailfilename) | |
idx=128 | |
while idx<ih.size(): | |
# Pegasus uses ASCII 26 (EOF) to separate emails, unfortunately these | |
# also crop up in email bodies because Pegasus makes no attempt to filter | |
# them out. Even better, Pegasus actually treats these fragments of an | |
# email as separate leading to guaranteed corruption when you delete | |
# messages or "repair" the mailbox using Pegasus. | |
eidx=ih.find(b'\x1a', idx) | |
warnings=False | |
while True: | |
if eidx==-1: | |
eidx=ih.size() | |
else: | |
if ih[eidx-1]!=13 and ih[eidx-1]!=10: | |
# If this ASCII 26 wasn't preceded with a newline of some form, | |
# is it immediately followed by a well formed email header? | |
if not re.match(mailheaderre, ih[eidx+1:eidx+256]): | |
print(' WARNING: Encountered a separator in a message', ih[eidx-50:eidx+50], ', ignoring') | |
eidx=ih.find(b'\x1a', eidx+1) | |
warnings=True | |
continue | |
break | |
while eidx>idx: | |
# Detect (CR)+LF messages | |
doubled=-1!=ih.find(b'\r\r', idx, eidx) | |
if doubled: | |
msgtxt=ih[idx:eidx].split(b'\r\n') | |
for line in msgtxt[:-1]: | |
if len(line) and not line.endswith(b'\r'): | |
print(repr(line)) | |
doubled=False | |
break | |
if doubled: | |
#print(' NOTE: Detected CRCRLF message, collapsing into CRLF message') | |
for line in range(len(msgtxt)): | |
while len(msgtxt[line]) and msgtxt[line][-1]==13: | |
msgtxt[line]=msgtxt[line][:-1] | |
msgtxt=b'\r\n'.join(msgtxt) | |
if not doubled: | |
doubled=-1!=ih.find(b'\x1a', idx, eidx) | |
if doubled: | |
print(' NOTE: Eliminating ASCII 26 (EOF) from message') | |
msgtxt=ih[idx:eidx].replace(b'\x1a', b'') | |
try: | |
msg=email.message_from_bytes(msgtxt if doubled else ih[idx:eidx], policy=inpolicy) | |
msgs.append(msg) | |
break | |
except Exception as ex: | |
# Does this email begin with a well formed email header? | |
if re.match(mailheaderre, ih[idx:eidx]): | |
print(' WARNING: Failed to parse email at', ih[idx:idx+50], 'due to', type(ex).__name__, ', retrying without strict parsing') | |
msg=email.message_from_bytes(msgtxt if doubled else ih[idx:eidx], policy=inpolicy2) | |
msgs.append(msg) | |
warnings=True | |
break | |
else: | |
# If not, it's a truncated write missing some of the front of the email, | |
# best we can do is try again from the next header | |
match=re.search(b'\n'+mailheaderre, ih[idx:eidx]) | |
if not match: | |
print(ih[idx:eidx]) | |
raise Exception("Failed to find next header in email") | |
print(' WARNING: Failed to parse email at', ih[idx:idx+50], 'due to', type(ex).__name__, ', restarting from next header at', ih[idx+match.start()+1:idx+match.start()+50]) | |
warnings=True | |
idx=idx+match.start()+1 | |
idx=eidx+1 | |
#if warnings: | |
# msgbytes=msg.as_bytes(unixfrom=False, policy=email.policy.compat32) | |
# print(msgbytes) | |
#break | |
tocheck[mailfilename]=len(msgs) | |
outpath='c:\\PMAIL\\EXPORT\\'+mailfilename | |
print(' Writing', len(msgs), 'emails to Unix mailbox', outpath, '...') | |
if maildir: | |
if os.path.exists(outpath): | |
os.removedirs(outpath) | |
oh=mailbox.Maildir(outpath) | |
else: | |
if os.path.exists(outpath+'.MBX'): | |
os.remove(outpath+'.MBX') | |
oh=mailbox.mbox(outpath+'.MBX') | |
oh.lock() | |
try: | |
for msg in msgs: | |
#msg.set_unixfrom('foo') | |
msgbytes=msg.as_bytes(policy=outpolicy) | |
oh.add(msgbytes) | |
finally: | |
oh.flush() | |
oh.unlock() | |
oh.close() | |
if maildir: | |
# maildir puts added messages into the new directory. We need them in cur. | |
os.rmdir(outpath+'\\cur') | |
os.rename(outpath+'\\new', outpath+'\\cur') | |
os.mkdir(outpath+'\\new') | |
#break | |
print('\n\nYou NEED to check Pegasus to make sure I parsed the mailfiles correctly:') | |
for mailfilename, msgcount in tocheck.items(): | |
print(' I reckon mail file', mailfilename, 'contains', msgcount, 'messages') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment