Skip to content

Instantly share code, notes, and snippets.

@erh
Last active July 31, 2021 06:46
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save erh/6029922 to your computer and use it in GitHub Desktop.
Save erh/6029922 to your computer and use it in GitHub Desktop.
This is a very simple python program that reads your email inbox and generates a histogram of how much email is unread in your inbox.
#/usr/bin/env python
###
### This generates a very simple histogram of unread e-mail in your INBOX.
### This is meant as a toy, and your are free to do whatever you want with the code.
###
# core python libraries
import datetime
import getpass
import imaplib
import re
import rfc822
import sys
import time
# mongo libraries
# the pymongo package includes bson
import bson
import pymongo
# optional- keyring
imapHost = "imap.gmail.com"
imapUser = None
imapPassword = None
class imapclient:
def __init__(self,host,user,secure=True,pwd=None,cache=False):
self.host = host
self.user = user
self.pwd = pwd
if self.pwd is None:
try:
import keyring
pwd = keyring.get_password( host , user )
print( pwd )
except Exception,e:
print( "can't get password from keyring: " + str(e) )
if pwd is None:
pwd = getpass.getpass()
try:
import keyring
keyring.set_password( host , user , pwd )
except Exception,e:
print( "can't save password: " + str(e) )
if secure:
self.mailbox = imaplib.IMAP4_SSL( host , 993 )
else:
self.mailbox = imaplib.IMAP4( host )
self.mailbox.login( user , pwd )
self.select( "INBOX" )
self.cache = None
if cache:
self.cache = pymongo.Connection().mail_cache.raw
def _parse(self,res):
if res[0] != "OK":
raise Exception( "error: %s" % str(res[0]) )
return res[1]
def select(self,name,readonly=True):
self.mailbox.select( name , readonly=readonly )
self.folder = name
def list(self):
res = self.mailbox.uid( "search" , "ALL" )
return res[1][0].split()
def _parse_headered( self , txt ):
headers = {}
prev = ""
while True:
line,end,txt = txt.partition( "\n" )
line = line.replace( "\r" , "" )
if len(line) == 0:
break
if line[0].isspace():
prev += "\n" + line
continue
if len(prev) > 0:
self._add_header( headers , prev )
prev = line
self._add_header( headers , prev )
for x in headers:
if len(headers[x]) == 1:
headers[x] = headers[x][0]
return ( headers , txt )
def _add_header( self , headers , line ):
line = line.rstrip()
if len(line) == 0:
return
name,temp,value = line.partition( ":" )
name = name.lower()
value = value.strip()
value = self._cleanSingleHeader( name , value )
if name in headers:
headers[name].append( value )
else:
headers[name] = [ value ]
def _convert_raw( self, txt ):
try:
headers , body = self._parse_headered( txt )
return { "headers" : headers , "body" : body }
except:
print( "couldn't parse" )
print( txt )
raise
def _cleanID(self,foo):
foo = foo.lower();
foo = foo.strip();
if foo.count( "<" ) != 1 or foo.count( ">") != 1:
if foo.count( " " ):
raise Exception( "bad id [%s]" , foo )
return foo
foo = foo.partition( "<" )[2]
foo = foo.partition( ">" )[0]
return foo
def _cleanSingleHeader(self,name,value):
if name == "message-id":
return self._cleanID( value )
if name == "to":
return [ z.strip() for z in value.split( "," ) ]
if name == "references":
return [ self._cleanID( x ) for x in re.split( "\s+" , value.lower() ) ]
if name == "in-reply-to":
try :
return self._cleanID( value )
except:
print( "bad id [%s]" % value )
return value
if name == "date":
t = rfc822.parsedate( value )
return datetime.datetime.fromtimestamp( time.mktime( t ) )
return value
def get_cache(self):
return self.cache
def get_id(self,uid):
return self.host + "-" + self.user + "-" + self.folder + "-" + str(uid)
def fetch(self,uid,headerOnly=False):
key = self.get_id(uid)
data = None
if self.cache:
data = self.cache.find_one( { "_id" : key } )
if data:
if data["headerOnly"] == headerOnly:
return self._convert_raw( data["data"] )
what = "(RFC822)"
if headerOnly:
what = "(RFC822.HEADER)"
typ, data = self.mailbox.uid( "fetch" , uid, what)
if typ != "OK":
raise Exception( "failed loading uid: %s typ: %s" % ( str(uid) , str(typ) ) )
if data is None:
return None
data = data[0]
if data is None:
return None
data = data[1]
converted = self._convert_raw( data )
if self.cache:
try:
self.cache.save( { "_id" : key,
"headerOnly" : headerOnly,
"headers" : converted["headers"],
"data" : bson.binary.Binary( data ) } )
except Exception,e:
print( "couldn't save message because of: %s" % e )
return converted
def print_histogram():
mailbox = imapclient( imapHost , imapUser , cache=True, pwd=imapPassword)
mailbox.select( "INBOX" , False )
last_seen = time.time()
all_mail = mailbox.list()
done = 0
for uid in all_mail:
if done % 10 == 1:
print( "%d / %d" % ( done , len(all_mail) ) )
done = done + 1
msg = mailbox.fetch( uid , True )
mailbox.get_cache().update( { "_id" : mailbox.get_id( uid ) },
{ "$set" : { "lastSeen" : last_seen } } )
pipeline = []
pipeline.append( { "$match" : { "lastSeen" : last_seen,
"_id" : re.compile( "INBOX" ),
"headers.date" : { "$gt" : datetime.datetime.fromtimestamp( last_seen - ( 20 * 86400 ) ) } } } )
p = {}
g = {}
for x in [ "year", "month", "dayOfMonth"]:#, "hour" ]:
p[x] = { "$" + x : "$headers.date" }
g[x] = "$" + x
pipeline.append( { "$project" : p } )
pipeline.append( { "$group" : { "_id" : g , "total" : { "$sum" : 1 } } } )
pipeline.append( { "$sort" : { "_id" : -1 } } )
res = mailbox.get_cache().aggregate( pipeline )
if res["ok"] == 1:
out = open( "histogram.html", "w" )
out.write( "<html><body>" )
now = datetime.datetime.now()
for x in res["result"]:
when = datetime.datetime( x["_id"]["year"], x["_id"]["month"], x["_id"]["dayOfMonth"] )
delta = now - when
out.write( "%d days ago, %d e-mails<br>" % ( delta.days, x["total"] ) )
out.write( "</body></html>" )
out.close()
if __name__ == "__main__":
if len(sys.argv) < 2:
print( "Usage: python %s <imap username>" )
sys.exit(-1)
imapUser = sys.argv[1]
print_histogram()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment