Last active
September 2, 2021 04:08
-
-
Save svenk/7468185971b846481d5502ff22cbdc95 to your computer and use it in GitHub Desktop.
Email inboxes (mbox) to CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# A python3 script to read an mbox file (extension to Maildir is trivial thanks to | |
# mailbox module) and writes out certain headers as CSV file. | |
# On large mbox files (say 5GB) it is slow because... it's Python. | |
# The script deals with some cornercases (encoding, newlines, ...) which I found in | |
# 100K emails. Otherwise, the builtin python mail libraries provide robustness. | |
# | |
# It processes roughly 110K (5GB mbox) mails in 2:35mins on my laptop and produces | |
# a 19MB CSV file. | |
# | |
# svenk, 2019-02-27 | |
# all included in python3 | |
import mailbox, csv, re, sys | |
from itertools import islice | |
from collections import defaultdict | |
from email.header import decode_header, Header | |
from email.utils import parsedate_to_datetime # handy | |
chain = lambda f, g: lambda x: f(g(x)) | |
# apt-get install python3-tqdm | |
try: | |
from tqdm import tqdm | |
except ModuleNotFoundError: | |
tqdm = lambda x,*y,**z:x | |
print("Info: If you had installed tqdm, you would see a progress bar", file=sys.stderr) | |
# Decode an E-Mail header which is encoded like | |
# "=?iso-8859-1?Q?Henner_B=FCsching?= <buschin@example.com>" | |
# to proper UTF-8: "Henner Büsching <buschin@example.com>" | |
# It leaves other strings unchanged. Will always output string. | |
def my_decode_header(header): | |
def decode_header_part(binary, encoding): | |
if isinstance(binary, str): return binary | |
try: | |
return binary.decode(encoding=encoding if encoding else "utf-8") | |
except UnicodeDecodeError: | |
# just ignore any broken characters | |
return binary.decode("ascii", errors="ignore") | |
if isinstance(header, str): | |
# try to decode it | |
return "".join(decode_header_part(binary,encoding) for (binary,encoding) in decode_header(header)) | |
elif isinstance(header, Header): | |
return str(header) | |
else: | |
raise ValueError("Don't understand header, is %s" % str(type(header))) | |
# Clean a one-line-header, removing newlines. Would break CSV | |
remove_newlines = lambda header: re.sub("[\r\n]", " ", header) | |
# doing both | |
header_filter = chain(remove_newlines, my_decode_header) | |
# Open the Mbox file | |
mb = mailbox.mbox("./Alle-E-Mails-inkl-Spam-und-E-3.mbox") | |
# mailbox.Message fields we are interested in | |
fields = ["To", "From", "CC", "Date", "Subject", "X-Gmail-Labels"] | |
# filters to apply, if any | |
filters = defaultdict(lambda: lambda x:x) # default: identity function | |
filters["To"] = header_filter | |
filters["From"] = header_filter | |
filters["Subject"] = header_filter | |
filters["CC"] = header_filter | |
# Open output file | |
csvfile = open("stats-from-mbox.csv", "w", newline="") | |
writer = csv.DictWriter(csvfile, delimiter="\t", quotechar='"', quoting=csv.QUOTE_ALL, fieldnames=fields) | |
writer.writeheader() | |
print("Generating mbox index (this may take up to some minutes for large files)...") | |
number_of_messages = len(mb) | |
flush_every = 100 | |
print("Now iterating %d messages..." % number_of_messages) | |
for i, msg in tqdm(enumerate(mb.itervalues()), total=number_of_messages): | |
out_dict = { field: (filters[field](msg[field]) if field in msg else "") for field in fields } | |
writer.writerow(out_dict) | |
if i % flush_every == 0: | |
csvfile.flush() | |
print("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
replacing
csvfile = open("stats-from-mbox.csv", "w", newline="")
withcsvfile = open("stats-from-mbox.csv", "w", encoding="utf-8", newline="")
would help address encoding issues.