Skip to content

Instantly share code, notes, and snippets.

@Jonathan727
Forked from urschrei/parseml.py
Last active September 30, 2022 19:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Jonathan727/a14089caab7e9cc57a446132a9147f22 to your computer and use it in GitHub Desktop.
Save Jonathan727/a14089caab7e9cc57a446132a9147f22 to your computer and use it in GitHub Desktop.
Extract attachments from EML files in the current dir, and write them to the output subdir
#!/usr/bin/env python
"""
2022 update:
- recurse attached eml files
2020 update:
- More iterators, fewer lists
- Python 3 compatible
- Processes files in parallel
(one thread per CPU, but that's not really how it works)
"""
import glob
import os
import email
from email import policy
from multiprocessing import Pool
EXTENSION = "eml"
def extract(file):
"""
Try to extract the attachments from all files in cwd
"""
# ensure that an output dir exists
output_directory = "output"
os.path.exists(output_directory) or os.makedirs(output_directory)
output_count = 0
try:
with open(file, "r") as file:
msg = email.message_from_file(file, policy=policy.default)
output_count = process_email_message(file.name, msg, output_directory)
if output_count == 0:
print("No attachment found for file %s!" % file.name)
# this should catch read and write errors
except IOError:
print("Problem with %s or one of its attachments!" % file.name)
return 1, output_count
def process_email_message(name, msg, output_directory):
output_count = 0
# msg_to = msg['Too']
# msg_date = msg['Date']
# msg_subject = msg['Subject']
# msg_body = msg.get_body()
# msg_body_text = msg_body.get_payload(decode=True)
for attachment in msg.iter_attachments():
payloads = attachment.get_payload()
message_payloads = [payload for payload in payloads if isinstance(payload, email.message.EmailMessage)]
for payload in message_payloads:
# inner_to = payload['To']
# inner_date = payload['Date']
inner_subject = payload['Subject']
s__format = f"Processing nested email message subject '{payload['Subject']:s}' from '{payload['From']:s}' to '{payload['To']:s}' on '{payload['Date']:s}'"
print(s__format)
output_count += process_email_message(name + " → " + inner_subject, payload, output_directory)
try:
output_filename = attachment.get_filename()
except AttributeError:
print("Got string instead of filename for %s. Skipping." % name)
continue
# If no attachments are found, skip this file
if output_filename:
attachment_creation_date = attachment.get_param('creation-date', None, 'content-disposition')
attachment_modification_date = attachment.get_param('modification-date', None, 'content-disposition')
full_output_file_path = os.path.join(output_directory, output_filename)
if os.path.exists(full_output_file_path):
print("Existing file %s will be overwritten" % output_filename)
with open(full_output_file_path, "wb") as of:
try:
payload = attachment.get_payload(decode=True)
of.write(payload)
output_count += 1
if attachment_modification_date is not None:
attachment_modification_datetime = email.utils.parsedate_to_datetime(
attachment_modification_date)
set_file_last_modified(full_output_file_path, attachment_modification_datetime)
except TypeError:
print("Couldn't get payload for %s" % output_filename)
return output_count
def set_file_last_modified(file_path, dt):
dt_epoch = dt.timestamp()
os.utime(file_path, (dt_epoch, dt_epoch))
if __name__ == "__main__":
# let's do this in parallel, using cpu count as number of threads
pool = Pool(None)
res = pool.map(extract, glob.iglob("*.%s" % EXTENSION))
# need these if we use _async
pool.close()
pool.join()
# 2-element list holding number of files, number of attachments
number_of_files = [sum(i) for i in zip(*res)]
print("Done: Processed {} files with {} attachments.".format(*number_of_files))
@Jonathan727
Copy link
Author

Extracts nested eml attachments recursively

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment