Skip to content

Instantly share code, notes, and snippets.

@svenk
Last active September 2, 2021 04:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save svenk/7468185971b846481d5502ff22cbdc95 to your computer and use it in GitHub Desktop.
Save svenk/7468185971b846481d5502ff22cbdc95 to your computer and use it in GitHub Desktop.
Email inboxes (mbox) to CSV
#!/usr/bin/env python3
# A python3 script to read an mbox file (extension to Maildir is trivial thanks to
# mailbox module) and writes out certain headers as CSV file.
# On large mbox files (say 5GB) it is slow because... it's Python.
# The script deals with some cornercases (encoding, newlines, ...) which I found in
# 100K emails. Otherwise, the builtin python mail libraries provide robustness.
#
# It processes roughly 110K (5GB mbox) mails in 2:35mins on my laptop and produces
# a 19MB CSV file.
#
# svenk, 2019-02-27
# all included in python3
import mailbox, csv, re, sys
from itertools import islice
from collections import defaultdict
from email.header import decode_header, Header
from email.utils import parsedate_to_datetime # handy
chain = lambda f, g: lambda x: f(g(x))
# apt-get install python3-tqdm
try:
from tqdm import tqdm
except ModuleNotFoundError:
tqdm = lambda x,*y,**z:x
print("Info: If you had installed tqdm, you would see a progress bar", file=sys.stderr)
# Decode an E-Mail header which is encoded like
# "=?iso-8859-1?Q?Henner_B=FCsching?= <buschin@example.com>"
# to proper UTF-8: "Henner Büsching <buschin@example.com>"
# It leaves other strings unchanged. Will always output string.
def my_decode_header(header):
def decode_header_part(binary, encoding):
if isinstance(binary, str): return binary
try:
return binary.decode(encoding=encoding if encoding else "utf-8")
except UnicodeDecodeError:
# just ignore any broken characters
return binary.decode("ascii", errors="ignore")
if isinstance(header, str):
# try to decode it
return "".join(decode_header_part(binary,encoding) for (binary,encoding) in decode_header(header))
elif isinstance(header, Header):
return str(header)
else:
raise ValueError("Don't understand header, is %s" % str(type(header)))
# Clean a one-line-header, removing newlines. Would break CSV
remove_newlines = lambda header: re.sub("[\r\n]", " ", header)
# doing both
header_filter = chain(remove_newlines, my_decode_header)
# Open the Mbox file
mb = mailbox.mbox("./Alle-E-Mails-inkl-Spam-und-E-3.mbox")
# mailbox.Message fields we are interested in
fields = ["To", "From", "CC", "Date", "Subject", "X-Gmail-Labels"]
# filters to apply, if any
filters = defaultdict(lambda: lambda x:x) # default: identity function
filters["To"] = header_filter
filters["From"] = header_filter
filters["Subject"] = header_filter
filters["CC"] = header_filter
# Open output file
csvfile = open("stats-from-mbox.csv", "w", newline="")
writer = csv.DictWriter(csvfile, delimiter="\t", quotechar='"', quoting=csv.QUOTE_ALL, fieldnames=fields)
writer.writeheader()
print("Generating mbox index (this may take up to some minutes for large files)...")
number_of_messages = len(mb)
flush_every = 100
print("Now iterating %d messages..." % number_of_messages)
for i, msg in tqdm(enumerate(mb.itervalues()), total=number_of_messages):
out_dict = { field: (filters[field](msg[field]) if field in msg else "") for field in fields }
writer.writerow(out_dict)
if i % flush_every == 0:
csvfile.flush()
print("Done")
@tareeko
Copy link

tareeko commented Sep 2, 2021

replacing csvfile = open("stats-from-mbox.csv", "w", newline="") with csvfile = open("stats-from-mbox.csv", "w", encoding="utf-8", newline="") would help address encoding issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment