Last active
January 3, 2023 10:59
-
-
Save thiscantbeserious/d5a5d79dbd1b41a44b0a49662fe28ed4 to your computer and use it in GitHub Desktop.
OpenSource Gmail Takeout MBox extractor (with Label to Folder structure into seperate .eml files)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# This will extract an mbox in seperate eml files from google takeout (manually unzip it first) | |
# Will try to create a folder structure based on the labels | |
# Emails will be formatted like this: | |
# 221104_110405_sender@mail.com_subject_in_45_chars.eml | |
# e.g. ./Inbox/2022/10/Special/221104_110405_sender@mail.com_subject_in_45_chars.eml | |
# | |
# Should work for all languages, but has been tested on German with a 14 GB large mbox file (with 169039 messages) | |
# Result: 169039 messages processed, 167591 messages saved | |
# Will log out failed exports, so might be room for further fixes / improvements | |
# Later on will add the ability to export attachments too | |
from email.header import decode_header | |
from email.parser import HeaderParser | |
from email.generator import Generator | |
from datetime import datetime | |
from email.utils import parsedate_to_datetime | |
from email.utils import parseaddr | |
from functools import reduce | |
import re | |
import sys | |
import getopt | |
import mailbox | |
import os | |
import unicodedata | |
import textwrap | |
FILE_DT_FORMAT = '%y%m%d_%H%M%S' | |
isFolder = re.compile('.*:(.*)|.*ategor(.*)') # TODO more languages | |
cwd = os.getcwd() | |
baseDir = os.path.join(cwd, 'extracted') | |
def getFolderName(inputStr): | |
if not inputStr or not type(inputStr) == str: | |
return None | |
matchFolder = isFolder.match(inputStr) | |
if matchFolder: | |
folderName = matchFolder.group(1) | |
if not folderName: | |
return None | |
folderName = folderName.replace('"','').replace("'", '').strip().strip(':') | |
return folderName | |
return None | |
def saveMessageToFile(message, filePath): | |
try: | |
outFile = open(filePath, 'w'); | |
gen = Generator(outFile) | |
gen.flatten(message) | |
except: | |
print(f'error processing file:{filePath}') | |
def slugify(value, allow_unicode=False): | |
""" | |
Taken from https://github.com/django/django/blob/master/django/utils/text.py | |
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated | |
dashes to single dashes. Remove characters that aren't alphanumerics, | |
underscores, or hyphens. Convert to lowercase. Also strip leading and | |
trailing whitespace, dashes, and underscores. | |
""" | |
value = str(value) | |
if allow_unicode: | |
value = unicodedata.normalize('NFKC', value) | |
else: | |
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') | |
value = re.sub(r'[^\w\s-]', '', value.lower()) | |
return re.sub(r'[_\s]+', '_', value).strip('-_') | |
def shorten(text: str, width=45, placeholder=""): | |
"""Collapse and truncate the given text to fit in the given width. | |
The text first has its whitespace collapsed. If it then fits in the *width*, it is returned as is. | |
Otherwise, as many words as possible are joined and then the placeholder is appended. | |
""" | |
if not text or not isinstance(text, str): | |
return str(text) | |
t = text.strip() | |
if len(t) <= width: | |
return t | |
# textwrap.shorten also throws ValueError if placeholder too large for max width | |
shorten_words = textwrap.shorten(t, width=width, placeholder=placeholder) | |
# textwrap.shorten doesn't split words, so if the text contains a long word without spaces, the result may be too short without this word. | |
# Here we use a diffrent way to include the start of this word in case shorten_words is less than 50% of `width` | |
if len(shorten_words) - len(placeholder) < (width - len(placeholder)) * 0.5: | |
return t[:width - len(placeholder)].strip() + placeholder | |
return shorten_words | |
def getNextFreeFileName(dir, filename, extension, count=1, max=20): | |
newFilename = f'{filename}_{count}' | |
if count >= max: return None | |
if os.path.exists(os.path.join(dir, f'{newFilename}{extension}')): | |
return getNextFreeFileName(dir, filename, extension, count+1, max) | |
return newFilename | |
def getNormalizedFileNameForMessage(message, dir=None): | |
if not message: return None | |
filename = "" | |
extension = ".eml" | |
date = message.get('date') | |
try: | |
date = datetime.fromtimestamp(parsedate_to_datetime(date).timestamp()).strftime(FILE_DT_FORMAT) | |
except: | |
date = message.get('date') | |
sender = message.get('from') | |
try: | |
sender = parseaddr(sender)[-1] | |
except: | |
sender = message.get('from') | |
subject = message.get('subject') | |
try: | |
decodedSubject = decode_header(subject) | |
subject = decodedSubject[0] | |
encoding = decodedSubject[1] | |
if not encoding: encoding = "ASCII" | |
if type(subject) == bytes: subject = subject.decode(encoding) | |
subject = slugify(subject) | |
subject = shorten(subject) | |
except: | |
subject = message.get('subject') | |
try: | |
subject = slugify(subject) | |
subject = shorten(subject) | |
except: | |
subject = message.get('subject') | |
if not date and not sender and not subject: | |
return None | |
if date and type(date) == str: filename += date | |
if sender and type(sender) == str: | |
if filename: filename += "_" | |
filename += sender | |
if subject and type(subject) == str: | |
if filename: filename += "_" | |
filename += subject | |
if dir and filename and type(filename) == str and os.path.exists(os.path.join(dir, f'{filename}{extension}')): | |
filename = getNextFreeFileName(dir, filename, extension) | |
if filename and type(filename == str): | |
filename += extension | |
if not filename or type(filename) != str: | |
return None | |
return filename | |
#if True is returned this means SKIP this entry | |
def parseLabelsIntoFolders(gmail_labels, read, flagged, sent, message, maxDepth=4): | |
if not gmail_labels: return; | |
encoding = gmail_labels[-1] | |
if not encoding: encoding = "ASCII" | |
gmail_labels = gmail_labels[0]; | |
if type(gmail_labels) == bytes: gmail_labels = gmail_labels.decode(encoding) | |
gmail_labels = gmail_labels.split(',') | |
#print(f'parseLabels called encoding is: {encoding}, {gmail_labels}') | |
length = len(gmail_labels) | |
if length < 1: return | |
if(maxDepth > length): maxDepth = length | |
rootFolder = "Posteingang" | |
# try to handle special labels (can fail) | |
if "Chat" in gmail_labels: #seriously? to lazy to write logic for that now | |
return True, encoding, read, flagged, sent | |
if "Unread" in gmail_labels or "Ungelesen" in gmail_labels: #TODO generic solution, currently just english and german should work | |
read = False | |
if "Starred" in gmail_labels: | |
flagged = True | |
if "Inbox" in gmail_labels: #TODO generic solution, currently just english and german should work | |
rootFolder = "Inbox" | |
if "Posteingang" in gmail_labels: | |
rootFolder = "Posteingang" | |
if "Sent" in gmail_labels: #TODO generic solution, currently just english and german should work | |
sent = True | |
rootFolder = "Sent" | |
if "Gesendet" in gmail_labels: | |
sent = True | |
rootFolder = "Gesendet" | |
if "Spam" in gmail_labels: | |
#rootFolder = "Spam" | |
return True, encoding, read, flagged, sent | |
folders = [rootFolder] | |
try: | |
date = message.get("date") | |
year = datetime.fromtimestamp(parsedate_to_datetime(date).timestamp()).strftime('%Y') | |
folders.append(year) | |
month = datetime.fromtimestamp(parsedate_to_datetime(date).timestamp()).strftime('%-m') | |
folders.append(month) | |
except: | |
None | |
for i in range(length): | |
label = gmail_labels[i] | |
if not label or label == None: continue | |
#print(f'Iterating label: {label}') | |
label = re.sub(r'[\r\n"]', '', label, flags=re.S) | |
label = label.strip(':,. ') | |
if encoding and type(label) == bytes: label.decode(encoding) | |
match label.lower(): | |
case "chat": continue | |
case "unread": continue | |
case "ungelesen": continue | |
case "opened": continue | |
case "geöffnet": continue | |
case "starred": continue | |
case "utf-8": continue | |
case "sent": | |
if "Inbox" in folders or "Posteingang" in folders: continue | |
case "posteingang": | |
if "Sent" in folders or "Gesendet" in folders: continue | |
folderName = getFolderName(label) | |
#for now we're just going 2 levels deep otherwise this is going to turn into a mess | |
if folderName and not folderName in folders: | |
folderName = folderName | |
folders.append(folderName) | |
else: | |
if not label in folders: | |
folders.append(label) | |
#folders = list(filter(isFolder.match, gmail_labels)) | |
folders = folders[:maxDepth] | |
return folders, encoding, read, flagged, sent | |
def main(argv): | |
in_mbox = "source.mbox" | |
prefix = "" | |
try: | |
opts, args = getopt.getopt(argv, "i:p:", ["infile=", "prefix="]) | |
except getopt.GetoptError: | |
print("python gmail_extractor.py -i <infile> -p <prefix>") | |
sys.exit(2) | |
for opt, arg in opts: | |
if opt in ("-i", "--infile"): | |
in_mbox = arg | |
elif opt in ("-p", "--prefix"): | |
prefix = arg | |
print("Processing file \"" + in_mbox + "\", output prefix \"" + prefix + "\"") | |
sys.stdout.flush() | |
sourcembox = mailbox.mbox(in_mbox, create=False) | |
print(str(sourcembox.__len__()) + " messages to process") | |
sys.stdout.flush() | |
mcount = mjunk = mchat = msaved = mskipped = 0 | |
for message in sourcembox: | |
read = True | |
flagged = False | |
sent = False | |
mcount += 1 | |
folders = [] | |
encoding = None | |
gmail_labels = None | |
if message["X-Gmail-Labels"]: | |
try: gmail_labels = decode_header(message["X-Gmail-Labels"])[0] | |
except: del gmail_labels | |
#continue | |
#print(f'Gmail labels found: {gmail_labels}') | |
if gmail_labels: | |
folders, encoding, read, flagged, sent = parseLabelsIntoFolders(gmail_labels, read, flagged, sent, message) | |
#except: None | |
if folders == True: continue #if folders is a Boolean True that means skip | |
if not folders: | |
if sent: folders = ["Postausgang"] | |
else: folders = ["Posteingang"] | |
#print(f'Folders parsed: {folders}') | |
# fixup missing status flags in the message | |
if read: message["Status"] = "RO" | |
else: message["Status"] = "O" | |
if flagged: message["X-Status"] = "F" | |
msaved += 1 | |
fileName = getNormalizedFileNameForMessage(message); | |
foldersPath = "" | |
if not fileName: | |
mskipped += 1 | |
print(f'skipped {mskipped} messages (+1 - no filename could be generated)') | |
continue | |
if folders: | |
#try: | |
foldersPath = reduce(os.path.join,folders) | |
absFoldersPath = os.path.join(baseDir, foldersPath); | |
try: os.makedirs(absFoldersPath) | |
except: None | |
#except: | |
# foldersPath = "" | |
# None | |
else: | |
foldersPath = "" | |
fileName = getNormalizedFileNameForMessage(message, os.path.join(baseDir, foldersPath)); | |
if not fileName: | |
continue | |
filePath = os.path.join(baseDir, foldersPath, fileName) | |
saveMessageToFile(message, filePath) | |
ts = message.get("date") | |
if ts: | |
try: | |
ts = parsedate_to_datetime(ts).timestamp() | |
os.utime(filePath, (ts, ts)) | |
except: | |
None | |
print(str(mcount) + " messages processed, " + str(msaved) + " messages saved") | |
#print("ignored: " + str(mjunk) + " spam, " + str(mchat) + " mchat") | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment