Last active
May 2, 2018 03:49
-
-
Save itsdogtime/7f659ef5510a20c4e0b787fae5d48166 to your computer and use it in GitHub Desktop.
Simple script to manage emails since thunderbird filters kind of suck.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/env/python | |
import imaplib | |
import sys | |
import email | |
import logging | |
import logging.handlers | |
import traceback | |
from time import sleep | |
_USER = "" | |
_PASS = "" | |
_SERVER = "" | |
def setup_logging(): | |
logger = logging.getLogger( __name__ ) | |
formatter = logging.Formatter( fmt='[%(asctime)s] %(message)s', datefmt='%d/%m/%Y %H:%M:%S' ) | |
logger.setLevel(logging.INFO) | |
file_handler = logging.handlers.TimedRotatingFileHandler( filename="logs/mailfilter", when="midnight" ) | |
file_handler.setLevel( logging.INFO ) | |
file_handler.setFormatter( formatter ) | |
logger.addHandler( file_handler ) | |
console_handler = logging.StreamHandler() | |
console_handler.setLevel( logging.INFO ) | |
console_handler.setFormatter( formatter ) | |
logger.addHandler( console_handler ) | |
return logger | |
def get_header( msg, key ): | |
text = email.Header.decode_header( msg.get( key, "" ) ) | |
return "".join( [ str.lower( x[0] ) for x in text ] ) | |
def filter_email(m, uid, msg): | |
move = None | |
seen = False | |
body = get_text(msg) | |
subj = get_header( msg, "subject" ) | |
to = get_header( msg, "to" ) | |
cc = get_header( msg, "cc" ) | |
_from = get_header( msg, "from" ) | |
repl = get_header( msg, "reply-to" ) | |
#---------------------------------------------- | |
# /!\ FILTERS HERE /!\ | |
#---------------------------------------------- | |
if not seen: | |
m.uid( "STORE", uid, "-FLAGS", "(\Seen)" ) | |
seen = "unseen" | |
else: | |
seen = "seen" | |
if move: | |
retcode, data = m.uid( "COPY", uid, move ) | |
if retcode == "OK": | |
m.uid( "STORE", uid, "+FLAGS", "(\Deleted)" ) | |
else: | |
m.uid( "STORE", uid, "-FLAGS", "(\Seen)" ) | |
logger.info( "Moved [{}] \"{}\" to \"{}\" and marked as {}.".format( uid, subj, move, seen) ) | |
else: | |
logger.info( "Did not move [{}] \"{}\" as it did not match any filters.".format(uid, subj) ) | |
def get_text(msg): | |
# Returns plaintext first part of email (should be the whole body -- not actually sure.) | |
if msg.is_multipart(): | |
return get_text( msg.get_payload( 0 ) ) | |
else: | |
return msg.get_payload( None, True ) | |
if __name__ == "__main__": | |
logger = setup_logging() | |
m = imaplib.IMAP4_SSL( _SERVER ) | |
retcode, capabilities = m.login( _USER, _PASS ) | |
m.select() # Set mailbox (to INBOX by default) | |
while 1: | |
try: | |
retcode, messages = m.uid( "SEARCH", None, "(UNSEEN)" ) | |
if retcode == "OK" and messages[ 0 ]: | |
for uid in messages[ 0 ].split( " " ): | |
typ, data = m.uid( "FETCH", uid,"(RFC822)" ) | |
msg = email.message_from_string( data[ 0 ][ 1 ] ) | |
filter_email( m, uid, msg ) | |
m.expunge() | |
sleep( 30 ) | |
except KeyboardInterrupt: | |
logger.info( "Keyboard Interrupt. Closing." ) | |
m.close() | |
sys.exit() | |
except: | |
exc_type, exc_obj, exc_tb = sys.exc_info( ) | |
while True: | |
if not exc_tb.tb_next: | |
break | |
exc_tb = exc_tb.tb_next | |
logger.error( "Error: [line {}] {}: {}".format( exc_tb.tb_lineno, str( exc_type.__name__ ), exc_obj ) ) | |
sys.exit( ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment