Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save loicdescotte/c7bb20dae591fbecb8f208e87bf90ff5 to your computer and use it in GitHub Desktop.
Save loicdescotte/c7bb20dae591fbecb8f208e87bf90ff5 to your computer and use it in GitHub Desktop.
Extract attachments from mbox file.
# Modified.
# Original script source:
# http://blog.marcbelmont.com/2012/10/script-to-extract-email-attachments.html
# Usage:
# Run the script from a folder with file "all.mbox"
# Attachments will be extracted into subfolder "attachments"
# with prefix "n " where "n" is an order of attachment in mbox file.
# ---------------
# Please check the unpacked files
# with an antivirus before opening them!
# ---------------
# I make no representations or warranties of any kind concerning
# the software, express, implied, statutory or otherwise,
# including without limitation warranties of title, merchantability,
# fitness for a particular purpose, non infringement, or the
# absence of latent or other defects, accuracy, or the present or
# absence of errors, whether or not discoverable, all to the
# greatest extent permissible under applicable law.
import errno
import mailbox
import os
import pathlib # since Python 3.4
import re
import traceback
from email.header import decode_header
prefs = {
'file': 'all.mbox',
'save_to': 'attachments/',
'extract_inline_images': True,
'start': 0,
'stop': 100000000000 # On which message to stop (not included).
}
assert os.path.isfile(prefs['file'])
mb = mailbox.mbox(prefs['file'])
if not os.path.exists(prefs['save_to']):
os.makedirs(prefs['save_to'])
inline_image_folder = os.path.join(prefs['save_to'], 'inline_images/')
if prefs['extract_inline_images'] and not os.path.exists(inline_image_folder):
os.makedirs(inline_image_folder)
total = 0
failed = 0
def to_file_path(save_to, name):
return os.path.join(save_to, name)
def get_extension(name):
extension = pathlib.Path(name).suffix
return extension if len(extension) <= 20 else ''
def resolve_name_conflicts(save_to, name, file_paths, attachment_number):
file_path = to_file_path(save_to, name)
START = 1
iteration_number = START
while os.path.normcase(file_path) in file_paths:
extension = get_extension(name)
iteration = '' if iteration_number <= START else ' (%s)' % (iteration_number)
new_name = '%s attachment %s%s%s' % (name, attachment_number, iteration, extension)
file_path = to_file_path(save_to, new_name)
iteration_number += 1
file_paths.append(os.path.normcase(file_path))
return file_path
# Whitespaces: tab, carriage return, newline, vertical tab, formfeed.
FORBIDDEN_WHITESPACE_IN_FILENAMES = re.compile('[\t\r\n\v\f]+')
OTHER_FORBIDDEN_FN_CHARACTERS = re.compile('[/\\\\\\?%\\*:\\|"<>\0]')
def filter_fn_characters(s):
result = re.sub(FORBIDDEN_WHITESPACE_IN_FILENAMES, ' ', s)
result = re.sub(OTHER_FORBIDDEN_FN_CHARACTERS, '_', result)
return result
def save(mid, part, attachments_counter, inline_image=False):
global total
total = total + 1
try:
if inline_image:
attachments_counter['inline_image'] += 1
attachment_number = 'ii' + str(attachments_counter['inline_image'])
save_to = inline_image_folder
else:
attachments_counter['value'] += 1
attachment_number = attachments_counter['value']
save_to = prefs['save_to']
if part.get_filename() is None:
name = str(attachment_number)
print('Filename is none: %s %s.' % (mid, name))
else:
decoded_name = decode_header(part.get_filename())
if isinstance(decoded_name[0][0], str):
name = decoded_name[0][0]
else:
try:
name_encoding = decoded_name[0][1]
name = decoded_name[0][0].decode(name_encoding)
except:
name = str(attachment_number)
print('Could not decode %s %s attachment name.' % (mid, name))
name = filter_fn_characters(name)
name = '%s %s' % (mid, name)
previous_file_paths = attachments_counter['file_paths']
try:
fn = resolve_name_conflicts(save_to, name, previous_file_paths, attachment_number)
with open(fn, 'wb') as f:
f.write(part.get_payload(decode=True))
except OSError as e:
if e.errno == errno.ENAMETOOLONG:
extension = get_extension(name)
short_name = '%s %s%s' % (mid, attachment_number, extension)
fn = resolve_name_conflicts(save_to, short_name, previous_file_paths, attachment_number)
with open(fn, 'wb') as f:
f.write(part.get_payload(decode=True))
else:
raise
except:
traceback.print_exc()
global failed
failed = failed + 1
def check_part(mid, part, attachments_counter):
mime_type = part.get_content_type()
if (part.get_content_disposition() == 'attachment') \
or ((part.get_content_disposition() != 'inline') and (part.get_filename() is not None)):
save(mid, part, attachments_counter)
elif (mime_type.startswith('application/') and not mime_type == 'application/javascript') \
or mime_type.startswith('model/') \
or mime_type.startswith('audio/') \
or mime_type.startswith('video/'):
if part.get_content_disposition() == 'inline':
print('Extracting inline part... Message id = %s, Content-type = %s.' % (mid, mime_type))
else:
print('Other Content-disposition... Message id = %s, Content-type = %s.' % (mid, mime_type))
save(mid, part, attachments_counter)
elif prefs['extract_inline_images'] and mime_type.startswith('image/'):
save(mid, part, attachments_counter, True)
elif part.is_multipart():
for p in part.get_payload():
check_part(mid, p, attachments_counter)
def process_message(mid):
msg = mb.get_message(mid)
if msg.is_multipart():
attachments_counter = {
'value': 0,
'inline_image': 0,
'file_paths': []
}
for part in msg.get_payload():
check_part(mid, part, attachments_counter)
for i in range(prefs['start'], prefs['stop']):
try:
process_message(i)
except KeyError:
print('The whole mbox file was processed.')
break
if i % 1000 == 0:
print('Messages processed: {}'.format(i))
print()
print('Total: %s' % (total))
print('Failed: %s' % (failed))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment