Skip to content

Instantly share code, notes, and snippets.

@dmaynor
Last active September 19, 2023 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dmaynor/962064b0ccefb73eab376a1dcccc804d to your computer and use it in GitHub Desktop.
Save dmaynor/962064b0ccefb73eab376a1dcccc804d to your computer and use it in GitHub Desktop.
import pypff
import argparse
import os
import hashlib
import concurrent.futures
import re
import markdown
import traceback
from collections import defaultdict
from datetime import datetime
DEBUG_ENABLED = False
DEBUG_FILE = None
def log_debug(message):
"""Logging function for debugging."""
global DEBUG_FILE
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if DEBUG_ENABLED:
with open(DEBUG_FILE, 'a', encoding='utf-8') as f:
f.write(f"{timestamp} - {message}\n")
def contains_email(message, email_address):
if message.html_body:
body = message.html_body.decode('utf-8', errors='ignore')
if re.search(email_address, body, re.I):
return True
if message.plain_text_body:
body = message.plain_text_body.decode('utf-8', errors='ignore')
if re.search(email_address, body, re.I):
return True
return False
def process_folder(folder, email_address, destination_folder):
log_debug(f"Processing folder: {folder.name}")
for message in folder.sub_messages:
log_debug(f"Processing email with Subject: {message.subject}")
if contains_email(message, email_address):
filename = save_email_to_file(message, destination_folder)
log_debug(f"Found email in Message with Subject: {message.subject}. Saved as {filename}.")
for subfolder in folder.sub_folders:
process_folder(subfolder, email_address, destination_folder)
def parse_pst(pst_file_path, email_address, destination_folder):
with pypff.file() as pst_file:
pst_file.open(pst_file_path)
process_folder(pst_file.root_folder, email_address, destination_folder)
def extract_emails_from_message(message):
email_addresses = []
if message.transport_headers:
headers = message.transport_headers.decode('utf-8', errors='ignore')
email_addresses += re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", headers)
return email_addresses
def extract_emails_from_folder(folder, email_dict):
for message in folder.sub_messages:
extracted_emails = extract_emails_from_message(message)
for email in extracted_emails:
email_dict[email] += 1
for subfolder in folder.sub_folders:
extract_emails_from_folder(subfolder, email_dict)
def parse_pst_for_extraction(pst_file_path):
email_dict = defaultdict(int)
with pypff.file() as pst_file:
pst_file.open(pst_file_path)
extract_emails_from_folder(pst_file.root_folder, email_dict)
return email_dict
def write_emails_to_md(email_dict, destination_folder):
sorted_emails = sorted(email_dict.items(), key=lambda x: x[1], reverse=True)
with open(os.path.join(destination_folder, 'emails.md'), 'w', encoding='utf-8') as f:
for email, count in sorted_emails:
f.write(f"{email}: {count}\n")
with open(os.path.join(destination_folder, 'emails.md'), 'r', encoding='utf-8') as f:
content = f.read()
html_content = markdown.markdown(content)
with open(os.path.join(destination_folder, 'emails.html'), 'w', encoding='utf-8') as f:
f.write(html_content)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Parse an Outlook PST file.')
parser.add_argument('pst_path', type=str, help='Path to the PST file to parse')
parser.add_argument('destination_folder', type=str, help='Destination folder to save results')
parser.add_argument('-e', '--email_address', type=str, help='Email address to search for', default=None)
parser.add_argument('-a', '--all_emails', action='store_true', help='Extract all email addresses from To/From/CC/BCC fields')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and store debug information')
args = parser.parse_args()
if args.debug:
DEBUG_ENABLED = True
DEBUG_FILE = os.path.join(args.destination_folder, 'debug.md')
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
with open(DEBUG_FILE, 'w', encoding='utf-8') as f:
f.write("# Debug Information\n\n")
log_debug(f"Debug mode enabled. Processing PST file: {args.pst_path}")
pst_file_size = os.path.getsize(args.pst_path)
log_debug(f"PST file size: {pst_file_size} bytes")
try:
if args.all_emails:
log_debug("Extracting all email addresses from the PST file.")
email_dict = parse_pst_for_extraction(args.pst_path)
write_emails_to_md(email_dict, args.destination_folder)
elif args.email_address:
log_debug(f"Searching for email address: {args.email_address}")
parse_pst(args.pst_path, args.email_address, args.destination_folder)
except Exception as e:
log_debug(f"Exception occurred: {str(e)}")
log_debug(traceback.format_exc())
if DEBUG_ENABLED:
with open(DEBUG_FILE, 'r', encoding='utf-8') as f:
content = f.read()
html_content = markdown.markdown(content)
with open(os.path.join(args.destination_folder, 'debug.html'), 'w', encoding='utf-8') as f:
f.write(html_content)
def get_destination_folder(base_folder):
"""Returns the next available folder based on the provided base folder."""
if not os.path.exists(base_folder):
os.makedirs(base_folder)
return base_folder
# If the base folder exists but doesn't end in a number, start checking with appended "1"
if not base_folder[-1].isdigit():
counter = 1
new_folder = f"{base_folder}{counter}"
def get_email_identifier(email):
"""Generate a unique identifier for the email based on its content."""
content = email.html_body
if isinstance(content, bytes):
email_hash = hashlib.sha256(content).hexdigest()
else:
email_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
return email_hash
def contains_email(message, email_address, destination_folder):
"""Check if the email address is in the transport headers of the message."""
headers = message.transport_headers
if headers is not None and email_address.lower() in headers.lower():
save_email_to_file(message, destination_folder)
return True
def parse_folder(folder, email_address, destination_folder):
"""Recursively parses a folder and its subfolders searching for a specific email address."""
print("Folder:", folder.name)
# Iterate through messages in the folder
if contains_email(message, email_address, destination_folder):
print("Found email in Message with Subject:", message.subject)
# Recursively iterate through subfolders
parse_folder(subfolder, email_address, destination_folder)
def parse_pst(pst_path, email_address, destination_folder):
"""Parses the given PST file searching for a specific email address."""
pst_file = pypff.file()
pst_file.open(pst_path)
parse_folder(pst_file.root_folder, email_address, destination_folder)
pst_file.close()
def save_email_to_file(email, destination_folder):
"""Saves the email to a file in the specified folder."""
def update_index_file(destination_folder, email, html_filename):
"""Appends a line to the index.md file with the email's date and link to the HTML file."""
index_file_path = os.path.join(destination_folder, "index.md")
def extract_email_addresses_from_header(header):
"""Extract email addresses from a header string."""
import re
return re.findall(r'[\w\.-]+@[\w\.-]+', header)
def extract_all_email_addresses(pst_path, destination_folder):
"""Extract all email addresses from the given PST file."""
pst_file = pypff.file()
pst_file.open(pst_path)
to_addresses = set()
from_addresses = set()
cc_addresses = set()
bcc_addresses = set()
def process_message(message):
nonlocal to_addresses, from_addresses, cc_addresses, bcc_addresses
headers = message.transport_headers or ""
headers = headers.lower() # Case-insensitive parsing
if "to:" in headers:
to_start = headers.index("to:") + 3
to_end = headers.index("\n", to_start)
to_addresses.update(extract_email_addresses_from_header(headers[to_start:to_end]))
if "from:" in headers:
from_start = headers.index("from:") + 6
from_end = headers.index("\n", from_start)
from_addresses.update(extract_email_addresses_from_header(headers[from_start:from_end]))
if "cc:" in headers:
cc_start = headers.index("cc:") + 4
cc_end = headers.index("\n", cc_start)
cc_addresses.update(extract_email_addresses_from_header(headers[cc_start:cc_end]))
if "bcc:" in headers:
bcc_start = headers.index("bcc:") + 5
bcc_end = headers.index("\n", bcc_start)
bcc_addresses.update(extract_email_addresses_from_header(headers[bcc_start:bcc_end]))
def process_folder(folder):
for message in folder.sub_messages:
process_message(message)
for subfolder in folder.sub_folders:
process_folder(subfolder)
process_folder(pst_file.root_folder)
pst_file.close()
# Write results to email.md
md_path = os.path.join(destination_folder, "email.md")
with open(md_path, 'w', encoding='utf-8') as md_file:
md_file.write("## To Addresses\n")
for addr in sorted(to_addresses):
md_file.write(f"- {addr}\n")
def convert_md_to_html_file(destination_folder, source_filename, target_filename):
"""Converts a specific markdown file in the destination folder to an HTML file."""
md_path = os.path.join(destination_folder, source_filename)
html_path = os.path.join(destination_folder, target_filename)
with open(md_path, 'r', encoding='utf-8') as md_file:
md_content = md_file.read()
html_content = markdown.markdown(md_content)
with open(html_path, 'w', encoding='utf-8') as html_file:
html_file.write(html_content)
# Argument parsing
parser = argparse.ArgumentParser(description='Search for an email address in an Outlook PST file or extract all addresses.')
parser.add_argument('base_destination_folder', type=str, help='Base destination folder to save results')
parser.add_argument('-a', '--all_addresses', action='store_true', help='Extract all email addresses')
# Determine the actual destination folder for this run
actual_destination_folder = get_destination_folder(args.base_destination_folder)
# Create index.md or email.md if they don't exist based on the mode
if args.email_address:
index_file_path = os.path.join(actual_destination_folder, "index.md")
elif args.all_addresses:
index_file_path = os.path.join(actual_destination_folder, "email.md")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment