Skip to content

Instantly share code, notes, and snippets.

@thiscantbeserious
Last active January 3, 2023 10:59
Show Gist options
  • Save thiscantbeserious/d5a5d79dbd1b41a44b0a49662fe28ed4 to your computer and use it in GitHub Desktop.
Save thiscantbeserious/d5a5d79dbd1b41a44b0a49662fe28ed4 to your computer and use it in GitHub Desktop.
OpenSource Gmail Takeout MBox extractor (with Label to Folder structure into seperate .eml files)
#!/usr/bin/env python3
# This will extract an mbox in seperate eml files from google takeout (manually unzip it first)
# Will try to create a folder structure based on the labels
# Emails will be formatted like this:
# 221104_110405_sender@mail.com_subject_in_45_chars.eml
# e.g. ./Inbox/2022/10/Special/221104_110405_sender@mail.com_subject_in_45_chars.eml
#
# Should work for all languages, but has been tested on German with a 14 GB large mbox file (with 169039 messages)
# Result: 169039 messages processed, 167591 messages saved
# Will log out failed exports, so might be room for further fixes / improvements
# Later on will add the ability to export attachments too
from email.header import decode_header
from email.parser import HeaderParser
from email.generator import Generator
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.utils import parseaddr
from functools import reduce
import re
import sys
import getopt
import mailbox
import os
import unicodedata
import textwrap
FILE_DT_FORMAT = '%y%m%d_%H%M%S'
isFolder = re.compile('.*:(.*)|.*ategor(.*)') # TODO more languages
cwd = os.getcwd()
baseDir = os.path.join(cwd, 'extracted')
def getFolderName(inputStr):
if not inputStr or not type(inputStr) == str:
return None
matchFolder = isFolder.match(inputStr)
if matchFolder:
folderName = matchFolder.group(1)
if not folderName:
return None
folderName = folderName.replace('"','').replace("'", '').strip().strip(':')
return folderName
return None
def saveMessageToFile(message, filePath):
try:
outFile = open(filePath, 'w');
gen = Generator(outFile)
gen.flatten(message)
except:
print(f'error processing file:{filePath}')
def slugify(value, allow_unicode=False):
"""
Taken from https://github.com/django/django/blob/master/django/utils/text.py
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
"""
value = str(value)
if allow_unicode:
value = unicodedata.normalize('NFKC', value)
else:
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = re.sub(r'[^\w\s-]', '', value.lower())
return re.sub(r'[_\s]+', '_', value).strip('-_')
def shorten(text: str, width=45, placeholder=""):
"""Collapse and truncate the given text to fit in the given width.
The text first has its whitespace collapsed. If it then fits in the *width*, it is returned as is.
Otherwise, as many words as possible are joined and then the placeholder is appended.
"""
if not text or not isinstance(text, str):
return str(text)
t = text.strip()
if len(t) <= width:
return t
# textwrap.shorten also throws ValueError if placeholder too large for max width
shorten_words = textwrap.shorten(t, width=width, placeholder=placeholder)
# textwrap.shorten doesn't split words, so if the text contains a long word without spaces, the result may be too short without this word.
# Here we use a diffrent way to include the start of this word in case shorten_words is less than 50% of `width`
if len(shorten_words) - len(placeholder) < (width - len(placeholder)) * 0.5:
return t[:width - len(placeholder)].strip() + placeholder
return shorten_words
def getNextFreeFileName(dir, filename, extension, count=1, max=20):
newFilename = f'{filename}_{count}'
if count >= max: return None
if os.path.exists(os.path.join(dir, f'{newFilename}{extension}')):
return getNextFreeFileName(dir, filename, extension, count+1, max)
return newFilename
def getNormalizedFileNameForMessage(message, dir=None):
if not message: return None
filename = ""
extension = ".eml"
date = message.get('date')
try:
date = datetime.fromtimestamp(parsedate_to_datetime(date).timestamp()).strftime(FILE_DT_FORMAT)
except:
date = message.get('date')
sender = message.get('from')
try:
sender = parseaddr(sender)[-1]
except:
sender = message.get('from')
subject = message.get('subject')
try:
decodedSubject = decode_header(subject)
subject = decodedSubject[0]
encoding = decodedSubject[1]
if not encoding: encoding = "ASCII"
if type(subject) == bytes: subject = subject.decode(encoding)
subject = slugify(subject)
subject = shorten(subject)
except:
subject = message.get('subject')
try:
subject = slugify(subject)
subject = shorten(subject)
except:
subject = message.get('subject')
if not date and not sender and not subject:
return None
if date and type(date) == str: filename += date
if sender and type(sender) == str:
if filename: filename += "_"
filename += sender
if subject and type(subject) == str:
if filename: filename += "_"
filename += subject
if dir and filename and type(filename) == str and os.path.exists(os.path.join(dir, f'{filename}{extension}')):
filename = getNextFreeFileName(dir, filename, extension)
if filename and type(filename == str):
filename += extension
if not filename or type(filename) != str:
return None
return filename
#if True is returned this means SKIP this entry
def parseLabelsIntoFolders(gmail_labels, read, flagged, sent, message, maxDepth=4):
if not gmail_labels: return;
encoding = gmail_labels[-1]
if not encoding: encoding = "ASCII"
gmail_labels = gmail_labels[0];
if type(gmail_labels) == bytes: gmail_labels = gmail_labels.decode(encoding)
gmail_labels = gmail_labels.split(',')
#print(f'parseLabels called encoding is: {encoding}, {gmail_labels}')
length = len(gmail_labels)
if length < 1: return
if(maxDepth > length): maxDepth = length
rootFolder = "Posteingang"
# try to handle special labels (can fail)
if "Chat" in gmail_labels: #seriously? to lazy to write logic for that now
return True, encoding, read, flagged, sent
if "Unread" in gmail_labels or "Ungelesen" in gmail_labels: #TODO generic solution, currently just english and german should work
read = False
if "Starred" in gmail_labels:
flagged = True
if "Inbox" in gmail_labels: #TODO generic solution, currently just english and german should work
rootFolder = "Inbox"
if "Posteingang" in gmail_labels:
rootFolder = "Posteingang"
if "Sent" in gmail_labels: #TODO generic solution, currently just english and german should work
sent = True
rootFolder = "Sent"
if "Gesendet" in gmail_labels:
sent = True
rootFolder = "Gesendet"
if "Spam" in gmail_labels:
#rootFolder = "Spam"
return True, encoding, read, flagged, sent
folders = [rootFolder]
try:
date = message.get("date")
year = datetime.fromtimestamp(parsedate_to_datetime(date).timestamp()).strftime('%Y')
folders.append(year)
month = datetime.fromtimestamp(parsedate_to_datetime(date).timestamp()).strftime('%-m')
folders.append(month)
except:
None
for i in range(length):
label = gmail_labels[i]
if not label or label == None: continue
#print(f'Iterating label: {label}')
label = re.sub(r'[\r\n"]', '', label, flags=re.S)
label = label.strip(':,. ')
if encoding and type(label) == bytes: label.decode(encoding)
match label.lower():
case "chat": continue
case "unread": continue
case "ungelesen": continue
case "opened": continue
case "geöffnet": continue
case "starred": continue
case "utf-8": continue
case "sent":
if "Inbox" in folders or "Posteingang" in folders: continue
case "posteingang":
if "Sent" in folders or "Gesendet" in folders: continue
folderName = getFolderName(label)
#for now we're just going 2 levels deep otherwise this is going to turn into a mess
if folderName and not folderName in folders:
folderName = folderName
folders.append(folderName)
else:
if not label in folders:
folders.append(label)
#folders = list(filter(isFolder.match, gmail_labels))
folders = folders[:maxDepth]
return folders, encoding, read, flagged, sent
def main(argv):
in_mbox = "source.mbox"
prefix = ""
try:
opts, args = getopt.getopt(argv, "i:p:", ["infile=", "prefix="])
except getopt.GetoptError:
print("python gmail_extractor.py -i <infile> -p <prefix>")
sys.exit(2)
for opt, arg in opts:
if opt in ("-i", "--infile"):
in_mbox = arg
elif opt in ("-p", "--prefix"):
prefix = arg
print("Processing file \"" + in_mbox + "\", output prefix \"" + prefix + "\"")
sys.stdout.flush()
sourcembox = mailbox.mbox(in_mbox, create=False)
print(str(sourcembox.__len__()) + " messages to process")
sys.stdout.flush()
mcount = mjunk = mchat = msaved = mskipped = 0
for message in sourcembox:
read = True
flagged = False
sent = False
mcount += 1
folders = []
encoding = None
gmail_labels = None
if message["X-Gmail-Labels"]:
try: gmail_labels = decode_header(message["X-Gmail-Labels"])[0]
except: del gmail_labels
#continue
#print(f'Gmail labels found: {gmail_labels}')
if gmail_labels:
folders, encoding, read, flagged, sent = parseLabelsIntoFolders(gmail_labels, read, flagged, sent, message)
#except: None
if folders == True: continue #if folders is a Boolean True that means skip
if not folders:
if sent: folders = ["Postausgang"]
else: folders = ["Posteingang"]
#print(f'Folders parsed: {folders}')
# fixup missing status flags in the message
if read: message["Status"] = "RO"
else: message["Status"] = "O"
if flagged: message["X-Status"] = "F"
msaved += 1
fileName = getNormalizedFileNameForMessage(message);
foldersPath = ""
if not fileName:
mskipped += 1
print(f'skipped {mskipped} messages (+1 - no filename could be generated)')
continue
if folders:
#try:
foldersPath = reduce(os.path.join,folders)
absFoldersPath = os.path.join(baseDir, foldersPath);
try: os.makedirs(absFoldersPath)
except: None
#except:
# foldersPath = ""
# None
else:
foldersPath = ""
fileName = getNormalizedFileNameForMessage(message, os.path.join(baseDir, foldersPath));
if not fileName:
continue
filePath = os.path.join(baseDir, foldersPath, fileName)
saveMessageToFile(message, filePath)
ts = message.get("date")
if ts:
try:
ts = parsedate_to_datetime(ts).timestamp()
os.utime(filePath, (ts, ts))
except:
None
print(str(mcount) + " messages processed, " + str(msaved) + " messages saved")
#print("ignored: " + str(mjunk) + " spam, " + str(mchat) + " mchat")
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment