Last active
September 19, 2023 17:33
-
-
Save dmaynor/962064b0ccefb73eab376a1dcccc804d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pypff | |
import argparse | |
import os | |
import hashlib | |
import concurrent.futures | |
import re | |
import markdown | |
import traceback | |
from collections import defaultdict | |
from datetime import datetime | |
DEBUG_ENABLED = False | |
DEBUG_FILE = None | |
def log_debug(message): | |
"""Logging function for debugging.""" | |
global DEBUG_FILE | |
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
if DEBUG_ENABLED: | |
with open(DEBUG_FILE, 'a', encoding='utf-8') as f: | |
f.write(f"{timestamp} - {message}\n") | |
def contains_email(message, email_address): | |
if message.html_body: | |
body = message.html_body.decode('utf-8', errors='ignore') | |
if re.search(email_address, body, re.I): | |
return True | |
if message.plain_text_body: | |
body = message.plain_text_body.decode('utf-8', errors='ignore') | |
if re.search(email_address, body, re.I): | |
return True | |
return False | |
def process_folder(folder, email_address, destination_folder): | |
log_debug(f"Processing folder: {folder.name}") | |
for message in folder.sub_messages: | |
log_debug(f"Processing email with Subject: {message.subject}") | |
if contains_email(message, email_address): | |
filename = save_email_to_file(message, destination_folder) | |
log_debug(f"Found email in Message with Subject: {message.subject}. Saved as {filename}.") | |
for subfolder in folder.sub_folders: | |
process_folder(subfolder, email_address, destination_folder) | |
def parse_pst(pst_file_path, email_address, destination_folder): | |
with pypff.file() as pst_file: | |
pst_file.open(pst_file_path) | |
process_folder(pst_file.root_folder, email_address, destination_folder) | |
def extract_emails_from_message(message): | |
email_addresses = [] | |
if message.transport_headers: | |
headers = message.transport_headers.decode('utf-8', errors='ignore') | |
email_addresses += re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", headers) | |
return email_addresses | |
def extract_emails_from_folder(folder, email_dict): | |
for message in folder.sub_messages: | |
extracted_emails = extract_emails_from_message(message) | |
for email in extracted_emails: | |
email_dict[email] += 1 | |
for subfolder in folder.sub_folders: | |
extract_emails_from_folder(subfolder, email_dict) | |
def parse_pst_for_extraction(pst_file_path): | |
email_dict = defaultdict(int) | |
with pypff.file() as pst_file: | |
pst_file.open(pst_file_path) | |
extract_emails_from_folder(pst_file.root_folder, email_dict) | |
return email_dict | |
def write_emails_to_md(email_dict, destination_folder): | |
sorted_emails = sorted(email_dict.items(), key=lambda x: x[1], reverse=True) | |
with open(os.path.join(destination_folder, 'emails.md'), 'w', encoding='utf-8') as f: | |
for email, count in sorted_emails: | |
f.write(f"{email}: {count}\n") | |
with open(os.path.join(destination_folder, 'emails.md'), 'r', encoding='utf-8') as f: | |
content = f.read() | |
html_content = markdown.markdown(content) | |
with open(os.path.join(destination_folder, 'emails.html'), 'w', encoding='utf-8') as f: | |
f.write(html_content) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Parse an Outlook PST file.') | |
parser.add_argument('pst_path', type=str, help='Path to the PST file to parse') | |
parser.add_argument('destination_folder', type=str, help='Destination folder to save results') | |
parser.add_argument('-e', '--email_address', type=str, help='Email address to search for', default=None) | |
parser.add_argument('-a', '--all_emails', action='store_true', help='Extract all email addresses from To/From/CC/BCC fields') | |
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and store debug information') | |
args = parser.parse_args() | |
if args.debug: | |
DEBUG_ENABLED = True | |
DEBUG_FILE = os.path.join(args.destination_folder, 'debug.md') | |
if not os.path.exists(destination_folder): | |
os.makedirs(destination_folder) | |
with open(DEBUG_FILE, 'w', encoding='utf-8') as f: | |
f.write("# Debug Information\n\n") | |
log_debug(f"Debug mode enabled. Processing PST file: {args.pst_path}") | |
pst_file_size = os.path.getsize(args.pst_path) | |
log_debug(f"PST file size: {pst_file_size} bytes") | |
try: | |
if args.all_emails: | |
log_debug("Extracting all email addresses from the PST file.") | |
email_dict = parse_pst_for_extraction(args.pst_path) | |
write_emails_to_md(email_dict, args.destination_folder) | |
elif args.email_address: | |
log_debug(f"Searching for email address: {args.email_address}") | |
parse_pst(args.pst_path, args.email_address, args.destination_folder) | |
except Exception as e: | |
log_debug(f"Exception occurred: {str(e)}") | |
log_debug(traceback.format_exc()) | |
if DEBUG_ENABLED: | |
with open(DEBUG_FILE, 'r', encoding='utf-8') as f: | |
content = f.read() | |
html_content = markdown.markdown(content) | |
with open(os.path.join(args.destination_folder, 'debug.html'), 'w', encoding='utf-8') as f: | |
f.write(html_content) | |
def get_destination_folder(base_folder): | |
"""Returns the next available folder based on the provided base folder.""" | |
if not os.path.exists(base_folder): | |
os.makedirs(base_folder) | |
return base_folder | |
# If the base folder exists but doesn't end in a number, start checking with appended "1" | |
if not base_folder[-1].isdigit(): | |
counter = 1 | |
new_folder = f"{base_folder}{counter}" | |
def get_email_identifier(email): | |
"""Generate a unique identifier for the email based on its content.""" | |
content = email.html_body | |
if isinstance(content, bytes): | |
email_hash = hashlib.sha256(content).hexdigest() | |
else: | |
email_hash = hashlib.sha256(content.encode('utf-8')).hexdigest() | |
return email_hash | |
def contains_email(message, email_address, destination_folder): | |
"""Check if the email address is in the transport headers of the message.""" | |
headers = message.transport_headers | |
if headers is not None and email_address.lower() in headers.lower(): | |
save_email_to_file(message, destination_folder) | |
return True | |
def parse_folder(folder, email_address, destination_folder): | |
"""Recursively parses a folder and its subfolders searching for a specific email address.""" | |
print("Folder:", folder.name) | |
# Iterate through messages in the folder | |
if contains_email(message, email_address, destination_folder): | |
print("Found email in Message with Subject:", message.subject) | |
# Recursively iterate through subfolders | |
parse_folder(subfolder, email_address, destination_folder) | |
def parse_pst(pst_path, email_address, destination_folder): | |
"""Parses the given PST file searching for a specific email address.""" | |
pst_file = pypff.file() | |
pst_file.open(pst_path) | |
parse_folder(pst_file.root_folder, email_address, destination_folder) | |
pst_file.close() | |
def save_email_to_file(email, destination_folder): | |
"""Saves the email to a file in the specified folder.""" | |
def update_index_file(destination_folder, email, html_filename): | |
"""Appends a line to the index.md file with the email's date and link to the HTML file.""" | |
index_file_path = os.path.join(destination_folder, "index.md") | |
def extract_email_addresses_from_header(header): | |
"""Extract email addresses from a header string.""" | |
import re | |
return re.findall(r'[\w\.-]+@[\w\.-]+', header) | |
def extract_all_email_addresses(pst_path, destination_folder): | |
"""Extract all email addresses from the given PST file.""" | |
pst_file = pypff.file() | |
pst_file.open(pst_path) | |
to_addresses = set() | |
from_addresses = set() | |
cc_addresses = set() | |
bcc_addresses = set() | |
def process_message(message): | |
nonlocal to_addresses, from_addresses, cc_addresses, bcc_addresses | |
headers = message.transport_headers or "" | |
headers = headers.lower() # Case-insensitive parsing | |
if "to:" in headers: | |
to_start = headers.index("to:") + 3 | |
to_end = headers.index("\n", to_start) | |
to_addresses.update(extract_email_addresses_from_header(headers[to_start:to_end])) | |
if "from:" in headers: | |
from_start = headers.index("from:") + 6 | |
from_end = headers.index("\n", from_start) | |
from_addresses.update(extract_email_addresses_from_header(headers[from_start:from_end])) | |
if "cc:" in headers: | |
cc_start = headers.index("cc:") + 4 | |
cc_end = headers.index("\n", cc_start) | |
cc_addresses.update(extract_email_addresses_from_header(headers[cc_start:cc_end])) | |
if "bcc:" in headers: | |
bcc_start = headers.index("bcc:") + 5 | |
bcc_end = headers.index("\n", bcc_start) | |
bcc_addresses.update(extract_email_addresses_from_header(headers[bcc_start:bcc_end])) | |
def process_folder(folder): | |
for message in folder.sub_messages: | |
process_message(message) | |
for subfolder in folder.sub_folders: | |
process_folder(subfolder) | |
process_folder(pst_file.root_folder) | |
pst_file.close() | |
# Write results to email.md | |
md_path = os.path.join(destination_folder, "email.md") | |
with open(md_path, 'w', encoding='utf-8') as md_file: | |
md_file.write("## To Addresses\n") | |
for addr in sorted(to_addresses): | |
md_file.write(f"- {addr}\n") | |
def convert_md_to_html_file(destination_folder, source_filename, target_filename): | |
"""Converts a specific markdown file in the destination folder to an HTML file.""" | |
md_path = os.path.join(destination_folder, source_filename) | |
html_path = os.path.join(destination_folder, target_filename) | |
with open(md_path, 'r', encoding='utf-8') as md_file: | |
md_content = md_file.read() | |
html_content = markdown.markdown(md_content) | |
with open(html_path, 'w', encoding='utf-8') as html_file: | |
html_file.write(html_content) | |
# Argument parsing | |
parser = argparse.ArgumentParser(description='Search for an email address in an Outlook PST file or extract all addresses.') | |
parser.add_argument('base_destination_folder', type=str, help='Base destination folder to save results') | |
parser.add_argument('-a', '--all_addresses', action='store_true', help='Extract all email addresses') | |
# Determine the actual destination folder for this run | |
actual_destination_folder = get_destination_folder(args.base_destination_folder) | |
# Create index.md or email.md if they don't exist based on the mode | |
if args.email_address: | |
index_file_path = os.path.join(actual_destination_folder, "index.md") | |
elif args.all_addresses: | |
index_file_path = os.path.join(actual_destination_folder, "email.md") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment