Skip to content

Instantly share code, notes, and snippets.

@ned14
Created December 30, 2016 02:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ned14/f261bfda5e376959ab3588242df0a1ef to your computer and use it in GitHub Desktop.
Save ned14/f261bfda5e376959ab3588242df0a1ef to your computer and use it in GitHub Desktop.
Converts old, corrupted Pegasus Mail mail stores into portable mboxo or maildir stores suitable for import into almost any other mail client
#!/usr/bin/python3
# Converts Pegasus Mail v4.x .PMM files into Unix mbox files using lots
# of heuristics to repair any corruption in the Pegasus mail store
# (the "Unix mbox" feature of Pegasus produces malformed Unix mbox files)
# (C) 2016 Niall Douglas http://www.nedprod.com/
# File created: Dec 2016
#
# Best used with python 3.6, anything older has a less able email and
# mailbox python modules
import sys, os, glob, mmap, re, email, email.policy, mailbox
maildir=False # makes maildirs instead of mboxes.
print("NOTE: Make SURE you have run 'Recover deleted space' on every Pegasus mailbox before running this program, otherwise deleted messages will be converted over!")
mailheaderre=b'[-A-Za-z0-9]+:'
outpolicy=email.policy.default.clone(linesep='\n', mangle_from_=True, max_line_length=998, refold_source='none')
inpolicy=email.policy.default.clone(linesep='\r\n', mangle_from_=True, max_line_length=998, refold_source='none', raise_on_defect=True)
inpolicy2=email.policy.default.clone(linesep='\r\n', mangle_from_=True, max_line_length=998, refold_source='none')
tocheck={}
for mailfile in glob.glob('c:\\PMAIL\\MAIL\\*.PMM'):
with open(mailfile, 'rt') as _ih:
msgs=[]
with mmap.mmap(_ih.fileno(), 0, access=mmap.ACCESS_READ) as ih:
mailfilename=ih.read(128)
mailfilename=mailfilename[:mailfilename.find(0)].decode('utf-8')
#if mailfilename!='General':
# continue
print('\n\nParsing file', mailfile, 'named', mailfilename)
idx=128
while idx<ih.size():
# Pegasus uses ASCII 26 (EOF) to separate emails, unfortunately these
# also crop up in email bodies because Pegasus makes no attempt to filter
# them out. Even better, Pegasus actually treats these fragments of an
# email as separate leading to guaranteed corruption when you delete
# messages or "repair" the mailbox using Pegasus.
eidx=ih.find(b'\x1a', idx)
warnings=False
while True:
if eidx==-1:
eidx=ih.size()
else:
if ih[eidx-1]!=13 and ih[eidx-1]!=10:
# If this ASCII 26 wasn't preceded with a newline of some form,
# is it immediately followed by a well formed email header?
if not re.match(mailheaderre, ih[eidx+1:eidx+256]):
print(' WARNING: Encountered a separator in a message', ih[eidx-50:eidx+50], ', ignoring')
eidx=ih.find(b'\x1a', eidx+1)
warnings=True
continue
break
while eidx>idx:
# Detect (CR)+LF messages
doubled=-1!=ih.find(b'\r\r', idx, eidx)
if doubled:
msgtxt=ih[idx:eidx].split(b'\r\n')
for line in msgtxt[:-1]:
if len(line) and not line.endswith(b'\r'):
print(repr(line))
doubled=False
break
if doubled:
#print(' NOTE: Detected CRCRLF message, collapsing into CRLF message')
for line in range(len(msgtxt)):
while len(msgtxt[line]) and msgtxt[line][-1]==13:
msgtxt[line]=msgtxt[line][:-1]
msgtxt=b'\r\n'.join(msgtxt)
if not doubled:
doubled=-1!=ih.find(b'\x1a', idx, eidx)
if doubled:
print(' NOTE: Eliminating ASCII 26 (EOF) from message')
msgtxt=ih[idx:eidx].replace(b'\x1a', b'')
try:
msg=email.message_from_bytes(msgtxt if doubled else ih[idx:eidx], policy=inpolicy)
msgs.append(msg)
break
except Exception as ex:
# Does this email begin with a well formed email header?
if re.match(mailheaderre, ih[idx:eidx]):
print(' WARNING: Failed to parse email at', ih[idx:idx+50], 'due to', type(ex).__name__, ', retrying without strict parsing')
msg=email.message_from_bytes(msgtxt if doubled else ih[idx:eidx], policy=inpolicy2)
msgs.append(msg)
warnings=True
break
else:
# If not, it's a truncated write missing some of the front of the email,
# best we can do is try again from the next header
match=re.search(b'\n'+mailheaderre, ih[idx:eidx])
if not match:
print(ih[idx:eidx])
raise Exception("Failed to find next header in email")
print(' WARNING: Failed to parse email at', ih[idx:idx+50], 'due to', type(ex).__name__, ', restarting from next header at', ih[idx+match.start()+1:idx+match.start()+50])
warnings=True
idx=idx+match.start()+1
idx=eidx+1
#if warnings:
# msgbytes=msg.as_bytes(unixfrom=False, policy=email.policy.compat32)
# print(msgbytes)
#break
tocheck[mailfilename]=len(msgs)
outpath='c:\\PMAIL\\EXPORT\\'+mailfilename
print(' Writing', len(msgs), 'emails to Unix mailbox', outpath, '...')
if maildir:
if os.path.exists(outpath):
os.removedirs(outpath)
oh=mailbox.Maildir(outpath)
else:
if os.path.exists(outpath+'.MBX'):
os.remove(outpath+'.MBX')
oh=mailbox.mbox(outpath+'.MBX')
oh.lock()
try:
for msg in msgs:
#msg.set_unixfrom('foo')
msgbytes=msg.as_bytes(policy=outpolicy)
oh.add(msgbytes)
finally:
oh.flush()
oh.unlock()
oh.close()
if maildir:
# maildir puts added messages into the new directory. We need them in cur.
os.rmdir(outpath+'\\cur')
os.rename(outpath+'\\new', outpath+'\\cur')
os.mkdir(outpath+'\\new')
#break
print('\n\nYou NEED to check Pegasus to make sure I parsed the mailfiles correctly:')
for mailfilename, msgcount in tocheck.items():
print(' I reckon mail file', mailfilename, 'contains', msgcount, 'messages')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment