Created
January 27, 2020 18:36
-
-
Save PandaWhoCodes/165abb43291f92a12efa077d0c6e8b11 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import imaplib | |
from email.parser import BytesParser | |
from pprint import pprint | |
import email.header | |
import time | |
import json | |
import csv | |
# see blog article https://teklern.blogspot.com/2017/11/download-all-your-email-information.html for instructions | |
# just run add you username and password and configure below for ouput_filename path, and output_type to json or csv | |
# and run | |
# Settings | |
username = 'email' | |
password = 'password' | |
# path to ouput file name (leave off the extention) | |
ouput_filename = 'allMail' # output of all data, note extension is on next variable | |
output_type = 'json' # CHANGE TO csv to save as a CSV File | |
imapAddress = 'imap.gmail.com' | |
column_names = ['n', 'From', 'To', 'Subject', 'Date', 'Received', 'Rfc822msgid', 'Size', 'uid', | |
'Attachments', 'text/plain', 'text/html', ] | |
chunk = 250 # number emails to ask for at each fetch imap server | |
start = 0 # start from first message | |
endAt = 50 # None # set to last number or None to get all email account | |
def f_recieved(s): return {'Rfc822msgid': f'Rfc822msgid:{s}'}; # just past this into gmail to find message | |
# specify header parts to save and any conversion functions on them | |
key_map = {'From': None, 'To': None, 'Subject': None, 'Date': None, | |
'Received': None, 'Message-ID': f_recieved, } | |
def parse_parts(msg, key_map): | |
''' return {key:msg[header_key]} or {parse_fun(msg[header_key]) as instructed in keymap''' | |
parts = {} | |
for hkey in key_map: | |
raw = msg[hkey] | |
if raw: | |
if isinstance(raw, email.header.Header): raw = str(raw) # to fix non ascii parts | |
f = key_map[hkey] | |
if f: | |
fparts = f(raw) | |
for k in fparts: parts[k] = fparts[k] | |
else: | |
parts[hkey] = raw | |
return parts | |
def decode_part(part, mime_type): # decode a part from the correct char coding. This was tricky | |
charset = part.get_content_charset() | |
if part.get_content_type() == mime_type: | |
part_str = part.get_payload(decode=1) | |
if charset == None: # this is when the coding is not in the email data | |
charset = 'utf-8' # assume utf-8 then | |
try: | |
return part_str.decode(charset, 'replace') # and try with replacement | |
except: | |
# on fail, ouput the message id in form that works with gmail find box | |
print(f"** pos {pos} {parts['Rfc822msgid']}:Decode Error, {mime_type} part skipped") | |
pprint(part_str) # and print what caused the error | |
print('----------') | |
return "" # no part if error | |
return "" | |
def decode_email(msg_str, pos, key_map): # process whole email parts and build email list/dict records | |
filenames = None | |
p = BytesParser() | |
message = p.parsebytes(msg_str) # get header | |
parts = parse_parts(message, key_map) # add header parts specified in key_map | |
parts['Size'] = len(msg_str) | |
plain_body = '' | |
html_body = '' | |
for part in message.walk(): | |
plain_body += decode_part(part, 'text/plain') | |
if len(plain_body) > 0: | |
html_body = "" | |
else: | |
html_body += decode_part(part, 'text/html') | |
fn = part.get_filename() | |
if fn: | |
if filenames == None: filenames = [] | |
filenames.append(fn) | |
if filenames: | |
parts['Attachments'] = filenames | |
if len(plain_body) > 0: | |
parts['text/plain'] = plain_body | |
elif len(html_body) > 0: | |
parts['text/html'] = html_body | |
return parts | |
def store_json(file, recs): | |
with open(file + '.json', 'w') as f: | |
f.write(json.dumps(recs, sort_keys=True, indent=4)) | |
def store_csv(file, recs): | |
with open(file + '.csv', 'w') as f: | |
dict_writer = csv.DictWriter(f, column_names) | |
dict_writer.writeheader() | |
dict_writer.writerows(recs) | |
if __name__ == '__main__': | |
t0 = time.time() | |
ms = imaplib.IMAP4_SSL(imapAddress) # open imap session ms | |
ms.login(username, password) | |
if ms.state == "AUTH": | |
print("logged in OK") | |
else: | |
print("login Failed") | |
exit(1) | |
ms.select('"[Gmail]/All Mail"') # select all mail folders, this is specific to gmail! | |
# NOTE: the double quotes are part of the select | |
result, data = ms.uid('search', None, 'ALL') # return 1 to number of all emails, list of uids | |
uids = data[0].split() # parse into array | |
n = len(uids) # get number of all emails | |
print(f"Total Number of emails: {n}") | |
# if endAt: n = endAt # override nif endAt is set | |
recs = [] | |
for i in range(start, n, chunk): # fetch 250 emails each time | |
srange = f'{i+1}:{min(i+chunk,n)}' | |
resp, data = ms.fetch(srange, '(RFC822)') | |
for id, msg in enumerate((m[1] for m in data if isinstance(m, tuple))): | |
pos = i + id + 1 | |
parts = decode_email(msg, pos, key_map) | |
pos = i + id + 1 | |
# print(parts) | |
parts['uid'] = str(int(uids[pos - 1])) | |
parts['n'] = pos | |
try: | |
json.dumps(parts) | |
recs.append(parts) | |
except: | |
print(f"** pos {pos} {parts['Rfc822msgid']}:json dump fail") # catch some rare errors here | |
pprint(parts) | |
print('---------') | |
t1 = time.time() | |
elapsed_mins = (t1 - t0) / 60 # mins from start | |
print(f'@ {pos}/{n} {pos/n*100:.1f}% elapsed: {elapsed_mins:.2f} mins') | |
ms.logout() | |
if output_type == 'json': | |
store_json(ouput_filename, recs) | |
else: | |
store_csv(ouput_filename, recs) | |
print('*** DONE ***') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment