Skip to content

Instantly share code, notes, and snippets.

@sevein
Last active October 9, 2022 00:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sevein/a0ac9af683f056988c36 to your computer and use it in GitHub Desktop.
Save sevein/a0ac9af683f056988c36 to your computer and use it in GitHub Desktop.
mbox extractor

mboxext

mboxext is a Mbox mailbox message extractor.

Example

$ python mboxext.py -v /home/vagrant/Sent.mbox /home/vagrant/output/

2015-10-01 17:54:46,060 [mboxext] INFO - Target directory already exists and it's not empty
2015-10-01 17:54:46,061 [mboxext] INFO - Opening Mbox mailbox: /home/vagrant/Sent.mbox
2015-10-01 17:54:49,356 [mboxext] INFO - 2101 messages found
2015-10-01 17:54:49,652 [mboxext] INFO - Message extraction progress: 100/2101
2015-10-01 17:54:50,002 [mboxext] INFO - Message extraction progress: 200/2101
2015-10-01 17:54:50,121 [mboxext] INFO - Message extraction progress: 300/2101
2015-10-01 17:54:50,399 [mboxext] INFO - Message extraction progress: 400/2101
2015-10-01 17:54:50,864 [mboxext] INFO - Message extraction progress: 500/2101
2015-10-01 17:54:51,127 [mboxext] INFO - Message extraction progress: 600/2101
2015-10-01 17:54:51,383 [mboxext] INFO - Message extraction progress: 700/2101
2015-10-01 17:54:51,762 [mboxext] INFO - Message extraction progress: 800/2101
2015-10-01 17:54:52,087 [mboxext] INFO - Message extraction progress: 900/2101
2015-10-01 17:54:52,515 [mboxext] INFO - Message extraction progress: 1000/2101
2015-10-01 17:54:52,911 [mboxext] INFO - Message extraction progress: 1100/2101
2015-10-01 17:54:54,185 [mboxext] INFO - Message extraction progress: 1200/2101
2015-10-01 17:54:54,353 [mboxext] INFO - Message extraction progress: 1300/2101
2015-10-01 17:54:54,600 [mboxext] INFO - Message extraction progress: 1400/2101
2015-10-01 17:54:55,330 [mboxext] INFO - Message extraction progress: 1500/2101
2015-10-01 17:54:56,147 [mboxext] INFO - Message extraction progress: 1600/2101
2015-10-01 17:54:56,506 [mboxext] INFO - Message extraction progress: 1700/2101
2015-10-01 17:54:56,951 [mboxext] INFO - Message extraction progress: 1800/2101
2015-10-01 17:54:57,458 [mboxext] INFO - Message extraction progress: 1900/2101
2015-10-01 17:54:58,769 [mboxext] INFO - Message extraction progress: 2000/2101
2015-10-01 17:54:58,983 [mboxext] INFO - Message extraction progress: 2100/2101
2015-10-01 17:55:01,432 [mboxext] INFO - 340 files found in 2101 messages
2015-10-01 17:55:01,432 [mboxext] INFO - Closing Mbox mailbox

Links

#!/usr/bin/env python3
import os
import sys
import mailbox
import email
USAGE = u"""
$ python maildir2mbox.py [maildir_path] [mbox_path]
"""
def maildir_to_mailbox(maildir_path, mbox_path):
"""
slightly adapted from maildir2mbox.py,
Nathan R. Yergler, 6 June 2010
http://yergler.net/blog/2010/06/06/batteries-included-or-maildir-to-mbox-again/
Port to Python 3 by Philippe Fremy
"""
# open the existing maildir and the target mbox file
maildir = mailbox.Maildir(maildir_path, email.message_from_binary_file)
mbox = mailbox.mbox(mbox_path)
# lock the mbox
# mbox.lock()
# iterate over messages in the maildir and add to the mbox
n = len(maildir)
for i, v in enumerate(maildir.iteritems()):
key, msg = v
if (i % 10) == 9:
print( 'Progress: msg %d of %d' % (i+1,n))
try:
mbox.add(msg)
except Exception:
print( 'Exception while processing msg with key: %s' % key )
traceback.print_exc()
# close and unlock
mbox.close()
maildir.close()
if __name__ == '__main__':
if len(sys.argv) < 3:
print(USAGE)
sys.exit(0)
maildir_path = sys.argv[-2]
mbox_path = sys.argv[-1]
# Canonize paths
maildir_path = os.path.realpath(maildir_path)
mbox_path = os.path.realpath(mbox_path)
print("{} -> {}".format(maildir_path, mbox_path))
if not os.path.exists(mbox_path):
os.makedirs(mbox_path)
maildir_dirs = ['cur', 'new', 'tmp']
for root, dirs, files in os.walk(maildir_path):
if root == maildir_path:
continue
relpath = os.path.relpath(root, maildir_path)
mbox_subpath = os.path.join(mbox_path, relpath)
# We don't have to traverse empty directories, only if they are Maildir
if not set(maildir_dirs).issubset(dirs):
continue
# Prune from dirs subsequent cur, new, tmp directories
dirs[:] = [d for d in dirs if d not in maildir_dirs]
# Create directory in the target
if not os.path.exists(mbox_subpath):
print("Creating {}".format(mbox_subpath))
os.makedirs(mbox_subpath)
maildir_to_mailbox(root, os.path.join(mbox_subpath, '.mbox'))
from __future__ import print_function
import sys
import argparse
import os.path
import mailbox
import base64
import logging
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
LOGGER_FORMAT = '%(asctime)s [mboxext] %(levelname)s - %(message)s'
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('source')
parser.add_argument('target')
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
return args
def find_mbox_files(path):
"""
Generate a list of Mbox files in a directory
"""
mbox_extension = '.mbox'
if os.path.isfile(path) and path.endswith(mbox_extension):
yield path
return
for name in os.listdir(path):
full_path = os.path.join(path, name)
if os.path.isdir(full_path):
for entry in find_mbox_files(full_path):
yield entry
elif os.path.isfile(full_path) and full_path.endswith(mbox_extension):
yield full_path
def sanitize_filename(filename, default=None):
"""Encode to UTF-8 and remove special characters"""
filename, extension = os.path.splitext(filename.encode('utf-8'))
filename = "".join([c for c in filename
if c.isalpha() or c.isdigit()]).rstrip()
extension = "".join([c for c in extension
if c.isalpha() or c.isdigit()]).rstrip()
if len(filename) > 0 and len(extension) > 0:
return '{}.{}'.format(filename, extension)
if len(filename) > 0:
return filename
if len(extension) > 0:
return '{}.{}'.format(default, extension)
return default
def extract_attached_file(part, index, path):
if not os.path.exists(path):
os.makedirs(path)
filename = sanitize_filename(part.get_filename(), default=index)
dest = os.path.join(path, filename)
LOGGER.debug('New file: %s', dest)
with open(dest, 'wb') as f:
f.write(part.get_payload(decode=True))
def extract_mbox(path, dest):
LOGGER.info('Opening Mbox mailbox: %s', path)
try:
mbox = mailbox.mbox(path)
mbox.lock()
count = len(mbox)
files = 0
LOGGER.info('{} messages found'.format(count))
for index, message in enumerate(mbox, start=1):
LOGGER.debug('Extracting message %d/%d with Messsage-ID %s', index,
count, message['Message-ID'])
if not LOGGER.isEnabledFor(logging.DEBUG) and index % 100 == 0:
LOGGER.info('Message extraction progress: %d/%d', index, count)
for part in message.walk():
filename = part.get_filename()
if filename is not None:
extract_attached_file(part, index, os.path.join(dest,
str(index)))
files = files + 1
finally:
LOGGER.info('%d files found in %d messages', files, count)
LOGGER.info('Closing Mbox mailbox')
mbox.close()
def main():
success = 0
args = parse_arguments()
if args.verbose or args.debug:
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logging.Formatter(LOGGER_FORMAT))
LOGGER.addHandler(ch)
if args.verbose:
LOGGER.setLevel(logging.INFO)
if args.debug:
LOGGER.setLevel(logging.DEBUG)
if not os.path.exists(args.source):
raise Exception('Source not found: {}'.format(args.source))
if os.path.exists(args.target):
if os.path.isfile(args.target):
raise Exception('Target already exists: {}'.format(args.target))
if os.path.isdir(args.target):
if len(os.listdir(args.target)) > 0:
LOGGER.info('Target directory already exists and it\'s not '
'empty')
else:
if not os.path.exists(args.target):
os.makedirs(args.target)
for index, mbox_file in enumerate(find_mbox_files(os.path.abspath(args.source))):
extract_mbox(mbox_file, os.path.join(args.target, str(index)))
return success
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment