Skip to content

Instantly share code, notes, and snippets.

@ned14
Created December 30, 2016 02:13
Embed
What would you like to do?
Converts old, corrupted Pegasus Mail mail stores into portable mboxo or maildir stores suitable for import into almost any other mail client
#!/usr/bin/python3
# Converts Pegasus Mail v4.x .PMM files into Unix mbox files using lots
# of heuristics to repair any corruption in the Pegasus mail store
# (the "Unix mbox" feature of Pegasus produces malformed Unix mbox files)
# (C) 2016 Niall Douglas http://www.nedprod.com/
# File created: Dec 2016
#
# Best used with python 3.6, anything older has a less able email and
# mailbox python modules
import sys, os, glob, mmap, re, email, email.policy, mailbox
maildir=False # makes maildirs instead of mboxes.
print("NOTE: Make SURE you have run 'Recover deleted space' on every Pegasus mailbox before running this program, otherwise deleted messages will be converted over!")
mailheaderre=b'[-A-Za-z0-9]+:'
outpolicy=email.policy.default.clone(linesep='\n', mangle_from_=True, max_line_length=998, refold_source='none')
inpolicy=email.policy.default.clone(linesep='\r\n', mangle_from_=True, max_line_length=998, refold_source='none', raise_on_defect=True)
inpolicy2=email.policy.default.clone(linesep='\r\n', mangle_from_=True, max_line_length=998, refold_source='none')
tocheck={}
for mailfile in glob.glob('c:\\PMAIL\\MAIL\\*.PMM'):
with open(mailfile, 'rt') as _ih:
msgs=[]
with mmap.mmap(_ih.fileno(), 0, access=mmap.ACCESS_READ) as ih:
mailfilename=ih.read(128)
mailfilename=mailfilename[:mailfilename.find(0)].decode('utf-8')
#if mailfilename!='General':
# continue
print('\n\nParsing file', mailfile, 'named', mailfilename)
idx=128
while idx<ih.size():
# Pegasus uses ASCII 26 (EOF) to separate emails, unfortunately these
# also crop up in email bodies because Pegasus makes no attempt to filter
# them out. Even better, Pegasus actually treats these fragments of an
# email as separate leading to guaranteed corruption when you delete
# messages or "repair" the mailbox using Pegasus.
eidx=ih.find(b'\x1a', idx)
warnings=False
while True:
if eidx==-1:
eidx=ih.size()
else:
if ih[eidx-1]!=13 and ih[eidx-1]!=10:
# If this ASCII 26 wasn't preceded with a newline of some form,
# is it immediately followed by a well formed email header?
if not re.match(mailheaderre, ih[eidx+1:eidx+256]):
print(' WARNING: Encountered a separator in a message', ih[eidx-50:eidx+50], ', ignoring')
eidx=ih.find(b'\x1a', eidx+1)
warnings=True
continue
break
while eidx>idx:
# Detect (CR)+LF messages
doubled=-1!=ih.find(b'\r\r', idx, eidx)
if doubled:
msgtxt=ih[idx:eidx].split(b'\r\n')
for line in msgtxt[:-1]:
if len(line) and not line.endswith(b'\r'):
print(repr(line))
doubled=False
break
if doubled:
#print(' NOTE: Detected CRCRLF message, collapsing into CRLF message')
for line in range(len(msgtxt)):
while len(msgtxt[line]) and msgtxt[line][-1]==13:
msgtxt[line]=msgtxt[line][:-1]
msgtxt=b'\r\n'.join(msgtxt)
if not doubled:
doubled=-1!=ih.find(b'\x1a', idx, eidx)
if doubled:
print(' NOTE: Eliminating ASCII 26 (EOF) from message')
msgtxt=ih[idx:eidx].replace(b'\x1a', b'')
try:
msg=email.message_from_bytes(msgtxt if doubled else ih[idx:eidx], policy=inpolicy)
msgs.append(msg)
break
except Exception as ex:
# Does this email begin with a well formed email header?
if re.match(mailheaderre, ih[idx:eidx]):
print(' WARNING: Failed to parse email at', ih[idx:idx+50], 'due to', type(ex).__name__, ', retrying without strict parsing')
msg=email.message_from_bytes(msgtxt if doubled else ih[idx:eidx], policy=inpolicy2)
msgs.append(msg)
warnings=True
break
else:
# If not, it's a truncated write missing some of the front of the email,
# best we can do is try again from the next header
match=re.search(b'\n'+mailheaderre, ih[idx:eidx])
if not match:
print(ih[idx:eidx])
raise Exception("Failed to find next header in email")
print(' WARNING: Failed to parse email at', ih[idx:idx+50], 'due to', type(ex).__name__, ', restarting from next header at', ih[idx+match.start()+1:idx+match.start()+50])
warnings=True
idx=idx+match.start()+1
idx=eidx+1
#if warnings:
# msgbytes=msg.as_bytes(unixfrom=False, policy=email.policy.compat32)
# print(msgbytes)
#break
tocheck[mailfilename]=len(msgs)
outpath='c:\\PMAIL\\EXPORT\\'+mailfilename
print(' Writing', len(msgs), 'emails to Unix mailbox', outpath, '...')
if maildir:
if os.path.exists(outpath):
os.removedirs(outpath)
oh=mailbox.Maildir(outpath)
else:
if os.path.exists(outpath+'.MBX'):
os.remove(outpath+'.MBX')
oh=mailbox.mbox(outpath+'.MBX')
oh.lock()
try:
for msg in msgs:
#msg.set_unixfrom('foo')
msgbytes=msg.as_bytes(policy=outpolicy)
oh.add(msgbytes)
finally:
oh.flush()
oh.unlock()
oh.close()
if maildir:
# maildir puts added messages into the new directory. We need them in cur.
os.rmdir(outpath+'\\cur')
os.rename(outpath+'\\new', outpath+'\\cur')
os.mkdir(outpath+'\\new')
#break
print('\n\nYou NEED to check Pegasus to make sure I parsed the mailfiles correctly:')
for mailfilename, msgcount in tocheck.items():
print(' I reckon mail file', mailfilename, 'contains', msgcount, 'messages')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment