Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
A script to invoke getmail when accounts receive new mail. See http://wrouesnel.github.io/articles/A%20better%20Getmail%20IDLE%20client/ for more details.
#!/usr/bin/env python
# Getmail Idler Script v2
# Will Rouesnel
#
# This is a pure Python threading affair - there are better ways to do things
# epoll but there are much better libraries (like gevent) for that type of
# thing.
#
# Note: this script assumes you're running getmail with the "delete" option for
# the mailbox it checks - i.e. your inbox should always be cleared of all mail.
# if you are not, then you need to edit IDLE response you search for to not be
# "EXISTS" ("NEW" would be more appropriate).
# The notify_mail function also assumes your system has a usable sendmail
# command of some sort that makes sense for your inbox - this may not be the
# case.
from imapclient import IMAPClient
import time
import threading
import subprocess
import argparse
import signal
import psutil
import sys
import os
import ConfigParser
import pwd
import traceback
import Queue
import logging
import logging.handlers
import socket
# use this to flash our emergency mail on a hard shutdown
from email.mime.text import MIMEText
# Event classes for dispatch queues. Idlers block on the queue for events and
# data. These are used in addition to specific watches in the idler class.
class EventShutdown(object): # Application is shutting down
def __init__(self, traceback_string=""):
self.traceback_string = traceback_string
class EventMainThreadFinalizeShutdown(EventShutdown): # Request main thread_event finalize shutting down
def __init__(self, traceback_string):
super(EventMainThreadFinalizeShutdown, self).__init__(traceback_string)
class EventIdlerFatalException(EventShutdown): # Fatal exception in Idler (app should shutdown)
def __init__(self, traceback_string, ingress_queue):
self.ingress_queue = ingress_queue # Queue to disconnect from this exception
super(EventIdlerFatalException, self).__init__(traceback_string)
class EventCaughtSignal(object): # Signal caught by signal handler
def __init__(self, signal, frame):
self.signal = signal
self.frame = frame
class EventOnline(object): pass # Idler is connected to IMAP
class EventGoingOnline(object): pass # Idler has just reconnected
class EventGoingOffline(object): pass # Idler has just disconnected
class EventOffline(object): pass # Idler is not connected to IMAP
class EventNewMail(object): pass # Idler has received new mail
class EventIdleTerminated(object): pass # Idler was terminated by server and
# should reconnect silently
class EventBootstrap(object): pass # Sent by manager to bootstrap startup
# Idler queue should no longer be posted to (object is being deleted)
class EventDisconnectQueue(object):
def __init__(self, ingress_queue):
self.ingress_queue = ingress_queue
# Special traceback_string to shutdown threads
class EventShutdownException(Exception): pass
# Sends notification mail to clients about server status
def notify_mail(subject, message, idler=None):
if idler is not None:
message = message + '\n' + "%s %s" % (idler.username, idler.host)
msg = MIMEText(message)
msg["From"] = pwd.getpwuid(os.getuid())[0]
msg["To"] = pwd.getpwuid(os.getuid())[0]
msg["Subject"] = "Getmail Idler: %s" % subject
try:
p = subprocess.Popen(['/usr/sbin/sendmail','-t'],stdin=subprocess.PIPE)
p.communicate(msg.as_string())
except:
logger.error("Got traceback_string trying to send notification email! %s"
% traceback.format_exc())
# This is the object which will manage an IMAP instance
class Idler(object):
def __init__(self, getmailConfigFile, ingress_queue, egress_queue):
# Event management
self.ingress_queue = ingress_queue # Events for the idler to act on
self.egress_queue = egress_queue # Events for the idlers creator to act on
self.client = None # IMAPClient instance
self.idle_active = threading.Event() # Has the server been placed in idle_active?
self.thread_event = None # event processor thread_event
self.thread_reconnect = None # thread_event for the reconnect timer if running
self.thread_idle = None # thread_event for the idle_active process if running
# note: this has no real use at the moment
self.thread_idleforcerestart = None # thread for the timer to force reset the IDLE state locally
# IMAP server data
self.host = ""
self.username = ""
self.password = ""
self.ssl = False
try:
logger.debug("Trying to load %s" % os.path.basename(getmailConfigFile))
config = ConfigParser.ConfigParser()
# Find the real config file
AltConfigFile = os.path.join(os.path.expanduser("~"), ".getmail",
os.path.basename(getmailConfigFile))
if os.path.exists(getmailConfigFile):
self.getmailConfig = getmailConfigFile
elif os.path.exists(AltConfigFile):
self.getmailConfig = AltConfigFile
else:
logger.error("Config file could not be found! %s"
% os.path.basename(getmailConfigFile))
raise Exception("No config file found!")
config.read(self.getmailConfig)
logger.debug("Loaded config file!")
type = config.get("retriever", "type")
if type.find("IMAP") < 0:
raise Exception("Retriever is not an IMAP type." +
" IDLE cannot be used!")
if type.find("SSL") >= 0:
self.ssl = True
self.host = config.get("retriever", "server")
self.username = config.get("retriever", "username")
self.password = config.get("retriever", "password")
logger.debug("Parsed configuration from getmailrc file!")
except Exception, e:
# Just die for now
self.egress_queue.put(EventIdlerFatalException(traceback.format_exc(),
self.ingress_queue))
return
# Start the thread_event loop
def start(self):
# This is the one place we don't use the queues, since we should only
# get invoked once.
if self.thread_event != None:
raise Exception("Idler is already running.")
return
# Start the main thread_event for this object
self.thread_event = threading.Thread(target=self.do_events, name="event-%s-%s-thread_event"
% (self.username, self.host))
self.thread_event.start()
return
# Process events on the queue of the object
def do_events(self):
try:
# Bind event handlers to object functions
EventHandlers = { EventShutdown : self.shutdown,
EventOffline : self.offline,
EventGoingOffline : self.going_offline,
EventGoingOnline : self.going_online,
EventOnline : self.online,
EventNewMail : self.newmail,
EventIdleTerminated : self.idle_terminated,
EventBootstrap : self.bootstrap
}
# Process event handlers
while True:
event = self.ingress_queue.get()
logger.debug("%s got event %s " % (self.thread_event.name, event.__class__.__name__) )
EventHandlers[type(event)]()
# NOTE: posting events from this loop while experiencing an exception
# caused no end of race conditions. Instead they're posted from the
# __del__ method.
except EventShutdownException: # queued shutdown
logger.debug("Notify egress_queue thread is exiting")
self.egress_queue.put(EventDisconnectQueue(self.ingress_queue))
except: # unhandled traceback_string - post shutdown
logger.debug("Unhandled exception!. Shutting down thread...")
traceback_string = traceback.format_exc()
self.egress_queue.put(EventIdlerFatalException(traceback_string,
self.ingress_queue))
logger.debug("Thread terminating.")
# Externally commanded to shutdown
def shutdown(self):
# Logout the IMAP process - this unblocks blocking IDLE calls with an
# exception. There doesn't seem to be a clean way to shut them down
# otherwise. This also causes logout to throw exceptions, but the net
# effect seems to be we terminate our server connection.
logger.debug("Logging out of IMAP server...")
try:
self.client.logout()
except:
pass
logger.debug("Releasing IMAPClient context")
self.client = None
# if we're currently reconnecting then issue a stop. If we do reconnect
# then it won't matter since the event-loop is going down after this
# function, but we need to shutdown the thread.
if self.thread_reconnect is not None:
logger.debug("Cancelling reconnect before thread shutdown...")
self.thread_reconnect.cancel()
if self.thread_idleforcerestart is not None:
logger.debug("Cancelling IDLE force restart timer...")
self.thread_idleforcerestart.cancel()
raise EventShutdownException()
# Do initial startup (immediate login attempt, then go online)
def bootstrap(self):
logger.info("Bootstrapping thread %s %s" % (self.username, self.host))
if self._imaplogin():
self.ingress_queue.put(EventGoingOnline())
else:
self.ingress_queue.put(EventGoingOffline())
# Idle terminated - try and reconnect once, then go offline if we fail
def idle_terminated(self):
logger.info("IDLE terminated. Reconnecting %s %s" % (self.username, self.host))
if self._imaplogin():
self.ingress_queue.put(EventOnline())
else:
self.ingress_queue.put(EventGoingOffline())
# Going offline - notify user
def going_offline(self):
logger.info("%s : %s OFFLINE." % (self.username, self.host) )
notify_mail("%s %s OFFLINE!" % (self.username, self.host),
"IMAP server login failed. Retrying every 5 seconds..",
idler=self)
self.ingress_queue.put(EventOffline()) # post event
# Going online - notify user
def going_online(self):
logger.info("%s : %s ONLINE." % (self.username, self.host) )
notify_mail("%s %s ONLINE!" % (self.username, self.host),
"IMAP server login SUCCEEDED.", idler=self)
self.ingress_queue.put(EventOnline()) # post event
# Online - IDLE poll the server
def online(self):
try:
# Check for pending mail BEFORE IDLE, IDLE only triggers on activity
response = self.client.select_folder("INBOX")
if self.has_mail([(response["EXISTS"], "EXISTS")]):
# Server has mail, let event loop fetch mail
self.ingress_queue.put(EventNewMail()) # post event
return
# Start IDLE mode
self.client.idle()
self.idle_active.set() # Server is now in IDLE
# Wait for IDLE activity on another thread
self.thread_idle = threading.Thread(target=self.idle,
name="idle_active-%s-%s-thread_event"
% (self.username, self.host))
self.thread_idle.start()
except IMAPClient.AbortError:
logger.info("Abort Error while trying to set IDLE.")
logger.debug("Traceback: %s" % traceback.format_exc())
self.ingress_queue.put(EventIdleTerminated())
# Determine if IMAP server has mail to fetch, and issue event
def has_mail(self, response):
exists = 0 # Num waiting messages
for item in reversed(response): # Parse for last exists update
if item[1] == "EXISTS":
exists = item[0]
break
# Are there messages waiting?
if exists > 0:
return True
else:
return False
# IDLE on the server.
def idle(self):
try:
# This thread_event listens until an IDLE event arrives, then posts an
# event to the Idler and terminates. This gives the event loop
# a chance to interrupt it if the event loop wants to shutdown,
# since EventOnline will come after EventShutdown.
logger.info("Waiting IDLE response %s %s" % (self.username, self.host))
response = self.client.idle_check()
self.client.idle_done() # Break IDLE session
self.idle_active.clear() # Server is out of idle_active
# Process IDLE to see if server has mail
if self.has_mail(response):
logger.info("IDLE response - server has mail %s %s" % (self.username, self.host))
self.ingress_queue.put(EventNewMail()) # post event
else:
# Server does not have mail
logger.info("IDLE response - no mail %s %s" % (self.username, self.host))
self.ingress_queue.put(EventOnline()) # post event
self.thread_idle = None # Thread is will finish after this.
# Connection errors
except IMAPClient.AbortError:
logger.info("Abort Error in IDLE thread.")
logger.debug("%s", traceback.format_exc())
self.idle_active.clear() # Server is out of idle_active
# Abort errors tend to disconnects
self.ingress_queue.put(EventIdleTerminated()) # post event
except IMAPClient.Error:
logger.warn("Error in IDLE thread.")
logger.debug("%s", traceback.format_exc())
self.idle_active.clear() # Server is out of idle_active
# IMAPClient errors tend not to be disconnects
self.ingress_queue.put(EventOnline()) # post event
except IMAPClient.ReadOnlyError:
logger.info("Mailbox went read only in IDLE thread.")
self.idle_active.clear() # Server is out of idle_active
# Reconnect right away
self.ingress_queue.put(EventOnline()) # post event
except:
# These types of errors shouldn't happen but we want to recover.
logger.warn("Exception in IDLE thread. %s %s"
% (self.username, self.host))
logger.warn("%s", traceback.format_exc())
self.idle_active.clear() # Server is out of idle_active
self.ingress_queue.put(EventIdleTerminated()) # post event
# IDLE got new mail
def newmail(self):
logger.info("starting Getmail download: %s" % self.getmailConfig)
output = ""
try:
output = subprocess.check_output([getmailexe,"-r",self.getmailConfig])
logger.debug("%s" % output)
logger.info("Finished getmail download.")
except subprocess.CalledProcessError, e:
output = e.output
logger.warning("Non-zero return for getmail (%s) %s" %
(self.getmailConfig, e.returncode))
notify_mail("Warn: non-zero getmail return! %s %s "
% (self.username, self.host),
output, idler=self)
logger.warning("%s" % output)
self.ingress_queue.put(EventOnline()) # post event
logger.debug("Getmail download complete: %s" % self.getmailConfig)
# Offline - try to reconnect.
def offline(self):
# Try to reconnect every 5 seconds or so.
self.thread_reconnect = threading.Timer(5.0, self.login_loop).start()
# Called by offline to handle looped login retries
def login_loop(self):
if self._imaplogin():
self.ingress_queue.put(EventGoingOnline()) # post event
else:
self.ingress_queue.put(EventOffline()) # post event
self.thread_reconnect = None # Clear thread context from Idler
# Do IMAP login
def _imaplogin(self):
try:
# There doesn't seem to be a reason to ever try and login while
# reusing the original IMAPClient object, and it's caused more
# trouble then it's worth. The plan here is to always issue a
# logout (just in case), but then always drop the IMAPClient and
# make a new one.
if self.client is not None:
try:
self.client.logout()
except IMAPClient.Error:
logger.debug("IMAPClient is logged out.")
self.client = None
logger.debug("IMAPClient cleared.")
# Make a new connection and login
logger.info("Attempting connection to %s" % (self.host))
self.client = IMAPClient(self.host, use_uid=True, ssl=self.ssl)
logger.info("Attempting login for %s" % (self.host))
self.client.login(self.username, self.password)
logger.info("Login succeeded %s" % (self.host))
return True
except:
logger.info("IMAP login failed.")
logger.debug("%s" % traceback.format_exc())
return False
### MAIN PROGRAM STARTS HERE ###
parser = argparse.ArgumentParser()
parser.add_argument("-r",
help="getmail configuration file to use" +
" (can specify more then once)", action="append",
dest="getmailconfigs",metavar='GETMAILRC')
parser.add_argument("--pid-file", "-p", nargs=1,
help="pidfile to use for process limiting", action="store",
dest="pidfile")
parser.add_argument("--verbose", "-v",
help="set output verbosity", action="count",
dest="verbosity")
parser.add_argument("--daemonize",
help="should process daemonize?", action="store_true",
dest="daemonize")
parser.add_argument("--logfile", nargs=1,
help="file to redirect log output too (useful for daemon mode)",
action="store", dest="logfile")
parser.add_argument("--getmail-exe", nargs=1,
help="path to getmail executable if non-standard",
action="store", dest="getmailexe")
args = parser.parse_args()
verbosity = 0
pidfile = None
daemonize = False
getmailexe = "getmail"
fd_keep = [] # Descriptors to preserve on daemonize
# Get command line args
verbosity = (40 - (args.verbosity*10)) if (40 - (args.verbosity*10)) > 0 else 0
logfile = os.path.realpath(args.logfile[0]) if (args.logfile is not None) else None
pidfile = os.path.realpath(args.pidfile[0]) if len(args.pidfile) > 0 else None
daemonize = args.daemonize
getmailexe = args.getmailexe[0] if (args.getmailexe is not None) else "getmail"
getmailconfigs = [os.path.realpath(path) for path in args.getmailconfigs]
# Configure logging /* */
logger = logging.getLogger()
log_formatter = logging.Formatter('%(levelname)s: %(message)s')
file_formatter = logging.Formatter('%(asctime)-15s: %(levelname)s: %(funcName)s : %(message)s')
logger.setLevel(verbosity)
# Console logging
ch = logging.StreamHandler()
ch.setLevel(verbosity)
ch.setFormatter(log_formatter)
logger.addHandler(ch)
# File logging
if logfile:
fh = logging.handlers.RotatingFileHandler(logfile, maxBytes=1048576,
backupCount=3)
fh.setLevel(verbosity)
fh.setFormatter(file_formatter)
logger.addHandler(fh)
fd_keep.append(fh.stream.fileno())
# Exit in error early if we somehow have no config data.
if len(args.getmailconfigs) == 0:
logger.error("No Getmail configurations specified - exiting now.")
sys.exit(1)
# Is process already running?
if pidfile:
# Was something running?
if os.path.isfile(pidfile):
# Is it still running?
pid = long(file(pidfile, 'r').readline())
if psutil.pid_exists(pid):
sys.exit(0)
else:
logger.debug("Found a stale PID file: %s" % pidfile)
# Check for GetMail executable usability
FNULL = open(os.devnull, 'w')
if subprocess.call("getmail --version", shell=True,stdout=FNULL,
stderr=subprocess.STDOUT) != 0:
logger.error("Getmail executable not available. It might not be installed.\n")
sys.exit(2)
# Check if a pidfile was specified and warn
if not args.pidfile:
logger.warning("Running without a pidfile. Multiple executions may occur.")
# This is the time to daemonize if we're going
if daemonize:
try:
pid = os.fork()
if pid > 0:
# exit first parent
os._exit(0)
except OSError, e:
logger.error("fork #1 failed: %d (%s)" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent, print write new pidfile
logger.info("Daemonized with PID %s" % pid)
logger.removeHandler(ch) # remove console handler after here
ch.close()
os._exit(0)
except OSError, e:
logger.error("fork #2 failed: %d (%s)" % (e.errno, e.strerror))
sys.exit(1)
# At this point we are daemonized - close old file descriptors except
# logging
import resource # Resource usage logger.information.
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if (maxfd == resource.RLIM_INFINITY):
maxfd = MAXFD
#Iterate through and close all file descriptors.
for fd in range(0, maxfd):
try:
if not any(keep_fd == fd for keep_fd in fd_keep):
os.close(fd)
except OSError: # ERROR, fd wasn't open to begin with (ignored)
pass
# Redirect std streams
sys.stdout.flush()
sys.stderr.flush()
si = file(os.devnull, 'r')
so = file(os.devnull, 'a+')
se = file(os.devnull, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# Write new pid file
file(pidfile, 'w').write(str(os.getpid()))
# The manager_queue is used to communicate with the manager thread_event by Idle()
# workers and the main thread_event.
manager_queue = Queue.Queue()
# The main queue is used to communicate to the main thread_event
main_queue = Queue.Queue()
def handle_signal(sig, frame):
logger.info("Caught signal %s" % sig)
main_queue.put(EventCaughtSignal(sig, frame)) # push event to main thread_event
for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGABRT,
signal.SIGTERM):
signal.signal(sig, handle_signal)
# The manager function runs in it's own thread_event and is responsible for
# setting up and tearing down workers.
def manager(manager_queue, main_queue):
# We need to keep track of Idler's queues
idler_queues = []
try:
# Create Idlers
for getmailrc in getmailconfigs:
new_queue = Queue.Queue()
Idler(getmailConfigFile=getmailrc, ingress_queue=new_queue,
egress_queue=manager_queue).start()
# We'd have to store more information to talk to specific queues,
# but we only need to broadcast to all of them.
idler_queues.append(new_queue)
# Bootstrap idlers by trying a first chance login
for queue in idler_queues:
queue.put(EventBootstrap()) # push event
# Listen for events
response = None
traceback_string = ""
while len(idler_queues) > 0: # loop until we have nothing more to talk to
response = manager_queue.get()
if type(response) == EventIdlerFatalException: # fatal exception in idler
logger.info("Fatal exception in idler!")
if response.traceback_string is not None:
logger.debug("Exception includes traceback")
traceback_string += response.traceback_string # stash traceback_string to pass to main
# The idler is dead, so remove it's queue now.
logger.debug("Disconnecting dead queue...")
idler_queues.remove(response.ingress_queue)
for queue in idler_queues:
logger.debug("Requesting idler shutdown...")
queue.put(EventShutdown())
elif type(response) == EventShutdown: # we were issued a shutdown
logger.info("Shutdown requested")
for queue in idler_queues:
logger.debug("Requesting idler shutdown...")
queue.put(EventShutdown())
elif type(response) == EventDisconnectQueue: # disconnect a queue
logger.debug("Removing disconnected queue.")
idler_queues.remove(response.ingress_queue)
else:
logger.warning("Got unknown event in manager: %s"
% type(response))
main_queue.put(EventMainThreadFinalizeShutdown(traceback_string)) # push event to main
except:
logger.error("Caught traceback_string in manager! Shutting down hard!")
for queue in idler_queues:
logger.debug("Requesting idler shutdown...")
queue.put(EventShutdown())
traceback_string = traceback.format_exc()
main_queue.put(EventMainThreadFinalizeShutdown(traceback_string)) # push event to main
# Wake the main thread_event by sending a signal to it
logger.info("Manager sending SIGINT to main thread_event...")
os.kill(os.getpid(), signal.SIGINT)
# Start the manager
try:
manager_thread = threading.Thread(target=manager, name="manager-thread_event",
args=(manager_queue, main_queue))
manager_thread.start()
except Exception:
logger.error("Exception when starting manager! %s", traceback.format_exc())
sys.exit(1)
# Handle signals
exitcode = 0
while True:
signal.pause()
response = main_queue.get()
if type(response) == EventCaughtSignal:
logger.info("Caught signal! %s" % response.signal)
# Was this a terminating signal?
for s in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGABRT,
signal.SIGTERM):
if s == response.signal:
manager_queue.put(EventShutdown()) # push event to manager
elif type(response) == EventMainThreadFinalizeShutdown:
logger.info("Finalizing shutdown in main thread_event...")
if response.traceback_string != "":
logger.error("Terminating from exception!")
logger.error(response.traceback_string)
exitcode = 1
break # break main loop
else:
logger.warning("Unrecognized event in main thread_event: %s" % type(response))
# Final dispensation
if pidfile:
try:
logger.debug("Removing pidfile %s" % pidfile)
os.unlink(pidfile)
logger.debug("Removed pidfile %s" % pidfile)
except OSError:
logger.warning("pidfile was already deleted?")
logger.info("Exiting with code %s" % exitcode)
notify_mail("Idler Daemon exiting with code %s" % exitcode,
"Idler daemon shutdown with exit code %s" % exitcode)
sys.exit(exitcode)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment