Skip to content

Instantly share code, notes, and snippets.

@PandaWhoCodes
Created January 27, 2020 18:36
Show Gist options
  • Save PandaWhoCodes/165abb43291f92a12efa077d0c6e8b11 to your computer and use it in GitHub Desktop.
Save PandaWhoCodes/165abb43291f92a12efa077d0c6e8b11 to your computer and use it in GitHub Desktop.
import imaplib
from email.parser import BytesParser
from pprint import pprint
import email.header
import time
import json
import csv
# see blog article https://teklern.blogspot.com/2017/11/download-all-your-email-information.html for instructions
# just run add you username and password and configure below for ouput_filename path, and output_type to json or csv
# and run
# Settings
username = 'email'
password = 'password'
# path to ouput file name (leave off the extention)
ouput_filename = 'allMail' # output of all data, note extension is on next variable
output_type = 'json' # CHANGE TO csv to save as a CSV File
imapAddress = 'imap.gmail.com'
column_names = ['n', 'From', 'To', 'Subject', 'Date', 'Received', 'Rfc822msgid', 'Size', 'uid',
'Attachments', 'text/plain', 'text/html', ]
chunk = 250 # number emails to ask for at each fetch imap server
start = 0 # start from first message
endAt = 50 # None # set to last number or None to get all email account
def f_recieved(s): return {'Rfc822msgid': f'Rfc822msgid:{s}'}; # just past this into gmail to find message
# specify header parts to save and any conversion functions on them
key_map = {'From': None, 'To': None, 'Subject': None, 'Date': None,
'Received': None, 'Message-ID': f_recieved, }
def parse_parts(msg, key_map):
''' return {key:msg[header_key]} or {parse_fun(msg[header_key]) as instructed in keymap'''
parts = {}
for hkey in key_map:
raw = msg[hkey]
if raw:
if isinstance(raw, email.header.Header): raw = str(raw) # to fix non ascii parts
f = key_map[hkey]
if f:
fparts = f(raw)
for k in fparts: parts[k] = fparts[k]
else:
parts[hkey] = raw
return parts
def decode_part(part, mime_type): # decode a part from the correct char coding. This was tricky
charset = part.get_content_charset()
if part.get_content_type() == mime_type:
part_str = part.get_payload(decode=1)
if charset == None: # this is when the coding is not in the email data
charset = 'utf-8' # assume utf-8 then
try:
return part_str.decode(charset, 'replace') # and try with replacement
except:
# on fail, ouput the message id in form that works with gmail find box
print(f"** pos {pos} {parts['Rfc822msgid']}:Decode Error, {mime_type} part skipped")
pprint(part_str) # and print what caused the error
print('----------')
return "" # no part if error
return ""
def decode_email(msg_str, pos, key_map): # process whole email parts and build email list/dict records
filenames = None
p = BytesParser()
message = p.parsebytes(msg_str) # get header
parts = parse_parts(message, key_map) # add header parts specified in key_map
parts['Size'] = len(msg_str)
plain_body = ''
html_body = ''
for part in message.walk():
plain_body += decode_part(part, 'text/plain')
if len(plain_body) > 0:
html_body = ""
else:
html_body += decode_part(part, 'text/html')
fn = part.get_filename()
if fn:
if filenames == None: filenames = []
filenames.append(fn)
if filenames:
parts['Attachments'] = filenames
if len(plain_body) > 0:
parts['text/plain'] = plain_body
elif len(html_body) > 0:
parts['text/html'] = html_body
return parts
def store_json(file, recs):
with open(file + '.json', 'w') as f:
f.write(json.dumps(recs, sort_keys=True, indent=4))
def store_csv(file, recs):
with open(file + '.csv', 'w') as f:
dict_writer = csv.DictWriter(f, column_names)
dict_writer.writeheader()
dict_writer.writerows(recs)
if __name__ == '__main__':
t0 = time.time()
ms = imaplib.IMAP4_SSL(imapAddress) # open imap session ms
ms.login(username, password)
if ms.state == "AUTH":
print("logged in OK")
else:
print("login Failed")
exit(1)
ms.select('"[Gmail]/All Mail"') # select all mail folders, this is specific to gmail!
# NOTE: the double quotes are part of the select
result, data = ms.uid('search', None, 'ALL') # return 1 to number of all emails, list of uids
uids = data[0].split() # parse into array
n = len(uids) # get number of all emails
print(f"Total Number of emails: {n}")
# if endAt: n = endAt # override nif endAt is set
recs = []
for i in range(start, n, chunk): # fetch 250 emails each time
srange = f'{i+1}:{min(i+chunk,n)}'
resp, data = ms.fetch(srange, '(RFC822)')
for id, msg in enumerate((m[1] for m in data if isinstance(m, tuple))):
pos = i + id + 1
parts = decode_email(msg, pos, key_map)
pos = i + id + 1
# print(parts)
parts['uid'] = str(int(uids[pos - 1]))
parts['n'] = pos
try:
json.dumps(parts)
recs.append(parts)
except:
print(f"** pos {pos} {parts['Rfc822msgid']}:json dump fail") # catch some rare errors here
pprint(parts)
print('---------')
t1 = time.time()
elapsed_mins = (t1 - t0) / 60 # mins from start
print(f'@ {pos}/{n} {pos/n*100:.1f}% elapsed: {elapsed_mins:.2f} mins')
ms.logout()
if output_type == 'json':
store_json(ouput_filename, recs)
else:
store_csv(ouput_filename, recs)
print('*** DONE ***')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment