Skip to content

Instantly share code, notes, and snippets.

@AlbertoEAF
Forked from benwattsjones/gmail_mbox_parser.py
Last active September 29, 2023 18:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AlbertoEAF/0ef8251d85d405237dd71b7598102ed9 to your computer and use it in GitHub Desktop.
Save AlbertoEAF/0ef8251d85d405237dd71b7598102ed9 to your computer and use it in GitHub Desktop.
Extension of gmail_mbox_parser.py to parse GMail's Google Takeout exports in .mbox format and do clustering analysis of senders so you can quickly triage which kind of information sources might no longer be relevant. It also adds command line arguments and exports the sender's statistics in a .csv file.
#! /usr/bin/env python3
# ~*~ utf-8 ~*~
# About: Extension of gmail_mbox_parser.py to parse GMail's Google Takeout exports in .mbox format and do clustering analysis of senders so you can quickly triage which kind of information sources might no longer be relevant. It also adds command line arguments and exports the sender's statistics in a .csv file.
# Based on https://gist.github.com/benwattsjones/060ad83efd2b3afc8b229d41f9b246c4 but expanded to add command line arguments, do clustering of senders and export a .csv with those statistics.
import re
import argparse
import mailbox
from collections import Counter
import pandas as pd
import csv
import bs4
def parse_sender(sender):
try:
if "<" in sender:
sender_name, sender_email = re.match(
"^([^<]*)<([^>]+)>.*$", sender
).groups()
if not sender_name:
sender_name = sender_email
else:
sender_name = sender_email = sender
except Exception as e:
raise ValueError(f"Couldn't parse sender: '{sender}'. BaseException=<{e}>")
return {
"sender": sender,
"sender_name": sender_name.strip(),
"sender_email": sender_email,
}
def get_html_text(html):
try:
return bs4.BeautifulSoup(html, "lxml").body.get_text(" ", strip=True)
except AttributeError: # message contents empty
return None
class GmailMboxMessage:
def __init__(self, email_data):
if not isinstance(email_data, mailbox.mboxMessage):
raise TypeError("Variable must be type mailbox.mboxMessage")
self.email_data = email_data
self.email_labels = self.email_data["X-Gmail-Labels"]
self.email_date = self.email_data["Date"]
self.email_from = str(self.email_data["From"])
self.email_to = str(self.email_data["To"])
self.email_subject = self.email_data["Subject"]
def parse_email(self):
self.email_text = self.read_email_payload()
def read_email_payload(self):
email_payload = self.email_data.get_payload()
if self.email_data.is_multipart():
email_messages = list(self._get_email_messages(email_payload))
else:
email_messages = [email_payload]
return [self._read_email_text(msg) for msg in email_messages]
def _get_email_messages(self, email_payload):
for msg in email_payload:
if isinstance(msg, (list, tuple)):
for submsg in self._get_email_messages(msg):
yield submsg
elif msg.is_multipart():
for submsg in self._get_email_messages(msg.get_payload()):
yield submsg
else:
yield msg
def _read_email_text(self, msg):
content_type = "NA" if isinstance(msg, str) else msg.get_content_type()
encoding = (
"NA" if isinstance(msg, str) else msg.get("Content-Transfer-Encoding", "NA")
)
if "text/plain" in content_type and "base64" not in encoding:
msg_text = msg.get_payload()
elif "text/html" in content_type and "base64" not in encoding:
msg_text = get_html_text(msg.get_payload())
elif content_type == "NA":
msg_text = get_html_text(msg)
else:
msg_text = None
return (content_type, encoding, msg_text)
######################### End of library, example of use below
def parse_args():
parser = argparse.ArgumentParser("GMail Inbox MBox parser")
parser.add_argument("-i", "--input", required=True)
parser.add_argument(
"--exclude-senders",
default=[],
nargs="*",
help="Optional. If passed, excludes such e-mails with such sender(s) from the analysis.",
)
parser.add_argument(
"--output-senders", default="_generated_mbox_senders_summary.csv"
)
return parser.parse_args()
def read_mailbox_mails(mbox, exclude_sender_patterns=[]):
def sender_in_exclude_senders(sender):
for sender_pattern in exclude_sender_patterns:
if sender_pattern in sender:
return True
return False
num_entries = len(mbox)
emails = []
for idx, email_obj in enumerate(mbox):
if idx % 100 == 0:
print(f"Parsing email {idx} of {num_entries}...".format(idx, num_entries))
email = GmailMboxMessage(email_obj)
if sender_in_exclude_senders(email.email_from):
continue
emails.append(email)
return emails
def with_col_count(df, column):
"""Adds a column named"""
output_column = f"{column}_count"
return df.merge(
df.groupby(column).size().rename(output_column),
left_on=column,
right_index=True,
)
def senders_summary(emails):
df = pd.DataFrame(parse_sender(email.email_from) for email in emails)
df = with_col_count(df, "sender_email")
df = with_col_count(df, "sender_name")
df = with_col_count(df, "sender")
df.drop_duplicates(inplace=True)
df.sort_values(
by=["sender_email_count", "sender_name_count"], ascending=False, inplace=True
)
return df
if __name__ == "__main__":
args = parse_args()
print(f"Reading mbox file '{args.input}'...")
mbox = mailbox.mbox(args.input)
print()
emails = read_mailbox_mails(mbox, exclude_sender_patterns=args.exclude_senders)
senders_summary_df = senders_summary(emails)
senders_summary_df.to_csv(args.output_senders, index=False, quoting=csv.QUOTE_ALL)
print("Email senders:")
print(senders_summary_df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment