Converts old, corrupted Pegasus Mail mail stores into portable mboxo or maildir stores suitable for import into almost any other mail client
#!/usr/bin/python3 | |
# Converts Pegasus Mail v4.x .PMM files into Unix mbox files using lots | |
# of heuristics to repair any corruption in the Pegasus mail store | |
# (the "Unix mbox" feature of Pegasus produces malformed Unix mbox files) | |
# (C) 2016 Niall Douglas http://www.nedprod.com/ | |
# File created: Dec 2016 | |
# | |
# Best used with python 3.6, anything older has a less able email and | |
# mailbox python modules | |
import sys, os, glob, mmap, re, email, email.policy, mailbox | |
maildir=False # makes maildirs instead of mboxes. | |
print("NOTE: Make SURE you have run 'Recover deleted space' on every Pegasus mailbox before running this program, otherwise deleted messages will be converted over!") | |
mailheaderre=b'[-A-Za-z0-9]+:' | |
outpolicy=email.policy.default.clone(linesep='\n', mangle_from_=True, max_line_length=998, refold_source='none') | |
inpolicy=email.policy.default.clone(linesep='\r\n', mangle_from_=True, max_line_length=998, refold_source='none', raise_on_defect=True) | |
inpolicy2=email.policy.default.clone(linesep='\r\n', mangle_from_=True, max_line_length=998, refold_source='none') | |
tocheck={} | |
for mailfile in glob.glob('c:\\PMAIL\\MAIL\\*.PMM'): | |
with open(mailfile, 'rt') as _ih: | |
msgs=[] | |
with mmap.mmap(_ih.fileno(), 0, access=mmap.ACCESS_READ) as ih: | |
mailfilename=ih.read(128) | |
mailfilename=mailfilename[:mailfilename.find(0)].decode('utf-8') | |
#if mailfilename!='General': | |
# continue | |
print('\n\nParsing file', mailfile, 'named', mailfilename) | |
idx=128 | |
while idx<ih.size(): | |
# Pegasus uses ASCII 26 (EOF) to separate emails, unfortunately these | |
# also crop up in email bodies because Pegasus makes no attempt to filter | |
# them out. Even better, Pegasus actually treats these fragments of an | |
# email as separate leading to guaranteed corruption when you delete | |
# messages or "repair" the mailbox using Pegasus. | |
eidx=ih.find(b'\x1a', idx) | |
warnings=False | |
while True: | |
if eidx==-1: | |
eidx=ih.size() | |
else: | |
if ih[eidx-1]!=13 and ih[eidx-1]!=10: | |
# If this ASCII 26 wasn't preceded with a newline of some form, | |
# is it immediately followed by a well formed email header? | |
if not re.match(mailheaderre, ih[eidx+1:eidx+256]): | |
print(' WARNING: Encountered a separator in a message', ih[eidx-50:eidx+50], ', ignoring') | |
eidx=ih.find(b'\x1a', eidx+1) | |
warnings=True | |
continue | |
break | |
while eidx>idx: | |
# Detect (CR)+LF messages | |
doubled=-1!=ih.find(b'\r\r', idx, eidx) | |
if doubled: | |
msgtxt=ih[idx:eidx].split(b'\r\n') | |
for line in msgtxt[:-1]: | |
if len(line) and not line.endswith(b'\r'): | |
print(repr(line)) | |
doubled=False | |
break | |
if doubled: | |
#print(' NOTE: Detected CRCRLF message, collapsing into CRLF message') | |
for line in range(len(msgtxt)): | |
while len(msgtxt[line]) and msgtxt[line][-1]==13: | |
msgtxt[line]=msgtxt[line][:-1] | |
msgtxt=b'\r\n'.join(msgtxt) | |
if not doubled: | |
doubled=-1!=ih.find(b'\x1a', idx, eidx) | |
if doubled: | |
print(' NOTE: Eliminating ASCII 26 (EOF) from message') | |
msgtxt=ih[idx:eidx].replace(b'\x1a', b'') | |
try: | |
msg=email.message_from_bytes(msgtxt if doubled else ih[idx:eidx], policy=inpolicy) | |
msgs.append(msg) | |
break | |
except Exception as ex: | |
# Does this email begin with a well formed email header? | |
if re.match(mailheaderre, ih[idx:eidx]): | |
print(' WARNING: Failed to parse email at', ih[idx:idx+50], 'due to', type(ex).__name__, ', retrying without strict parsing') | |
msg=email.message_from_bytes(msgtxt if doubled else ih[idx:eidx], policy=inpolicy2) | |
msgs.append(msg) | |
warnings=True | |
break | |
else: | |
# If not, it's a truncated write missing some of the front of the email, | |
# best we can do is try again from the next header | |
match=re.search(b'\n'+mailheaderre, ih[idx:eidx]) | |
if not match: | |
print(ih[idx:eidx]) | |
raise Exception("Failed to find next header in email") | |
print(' WARNING: Failed to parse email at', ih[idx:idx+50], 'due to', type(ex).__name__, ', restarting from next header at', ih[idx+match.start()+1:idx+match.start()+50]) | |
warnings=True | |
idx=idx+match.start()+1 | |
idx=eidx+1 | |
#if warnings: | |
# msgbytes=msg.as_bytes(unixfrom=False, policy=email.policy.compat32) | |
# print(msgbytes) | |
#break | |
tocheck[mailfilename]=len(msgs) | |
outpath='c:\\PMAIL\\EXPORT\\'+mailfilename | |
print(' Writing', len(msgs), 'emails to Unix mailbox', outpath, '...') | |
if maildir: | |
if os.path.exists(outpath): | |
os.removedirs(outpath) | |
oh=mailbox.Maildir(outpath) | |
else: | |
if os.path.exists(outpath+'.MBX'): | |
os.remove(outpath+'.MBX') | |
oh=mailbox.mbox(outpath+'.MBX') | |
oh.lock() | |
try: | |
for msg in msgs: | |
#msg.set_unixfrom('foo') | |
msgbytes=msg.as_bytes(policy=outpolicy) | |
oh.add(msgbytes) | |
finally: | |
oh.flush() | |
oh.unlock() | |
oh.close() | |
if maildir: | |
# maildir puts added messages into the new directory. We need them in cur. | |
os.rmdir(outpath+'\\cur') | |
os.rename(outpath+'\\new', outpath+'\\cur') | |
os.mkdir(outpath+'\\new') | |
#break | |
print('\n\nYou NEED to check Pegasus to make sure I parsed the mailfiles correctly:') | |
for mailfilename, msgcount in tocheck.items(): | |
print(' I reckon mail file', mailfilename, 'contains', msgcount, 'messages') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment