A daemon script for watching watching IMAP accounts with IDLE and invoking getmail. Works with GetMail config files.
#!/usr/bin/env python | |
import imaplib2 | |
import time | |
import threading | |
import subprocess | |
import argparse | |
import signal | |
import psutil | |
import sys | |
import os | |
import ConfigParser | |
import pwd | |
import traceback | |
import Queue | |
import logging | |
import logging.handlers | |
import socket | |
# use this to flash our emergency mail on a hard shutdown | |
from email.mime.text import MIMEText | |
# Logging | |
verbosity = 0 | |
pidfile = None | |
daemonize = False | |
start_shutdown = threading.Event() | |
CONST_shutdown = "dienow" | |
# Send notification mail to clients about server status | |
def notify_mail(subject, message, idler=None): | |
if idler is not None: | |
message = message + '\n' + "%s %s" % (idler.username, idler.server) | |
msg = MIMEText(message) | |
msg["From"] = pwd.getpwuid(os.getuid())[0] | |
msg["To"] = pwd.getpwuid(os.getuid())[0] | |
msg["Subject"] = "Getmail Idler: %s" % subject | |
try: | |
p = subprocess.Popen(['/usr/sbin/sendmail','-t'],stdin=subprocess.PIPE) | |
p.communicate(msg.as_string()) | |
except: | |
logger.error("Got exception trying to send notification email! %s" | |
% traceback.format_exc()) | |
# This is the threading object that does all the waiting on | |
# the event | |
class Idler(object): | |
def __init__(self, getmailConfig, reidle_queue): | |
self.getmailConfig = getmailConfig | |
self.reidle_queue = reidle_queue | |
self.M = None | |
self.onlineEvent = threading.Event() | |
logger.info("Finding config: %s" % os.path.basename(self.getmailConfig)) | |
if not os.path.exists(self.getmailConfig): | |
# try the .getmail directory | |
altconfigfile = os.path.join(os.path.expanduser("~"), ".getmail", | |
os.path.basename(self.getmailConfig)) | |
if not os.path.exists(altconfigfile): | |
raise Exception("%s does not exist!" % altconfigfile) | |
else: | |
self.getmailConfig = altconfigfile | |
logger.info("Parsing: %s" % self.getmailConfig) | |
config = ConfigParser.SafeConfigParser() | |
config.read(self.getmailConfig) | |
servertype = config.get('retriever', 'type') | |
if servertype.find('IMAP') == -1: | |
raise Exception("Non-IMAP configuration file won't be polled. " + | |
"You should not pass it in to this command.") | |
# Store these so we can reconnect on disconnect | |
self.username = config.get('retriever', 'username') | |
self.password = config.get('retriever', 'password') | |
self.server = config.get('retriever', 'server') | |
# Do the login | |
self.onlineEvent.set() # initially set this, so we get notified | |
self._imaplogin() | |
# IMAP login | |
def _imaplogin(self): | |
# FIXME: handle non-SSL servers!! | |
try: | |
if self.M is None: | |
self.M = imaplib2.IMAP4_SSL(self.server) | |
self.M.login(self.username, self.password) | |
# We need to get out of the AUTH state, so we just select the INBOX. | |
self.M.select("INBOX") | |
logger.info("%s : %s CONNECTED. Fetching new mail..." | |
% (self.username, self.server) ) | |
notify_mail("%s %s AVAILABLE!" % (self.username, self.server), | |
"IMAP server login SUCCEEDED.", | |
idler=self) | |
self.onlineEvent.set() | |
reidle_queue.put(self.idle) | |
except (imaplib2.IMAP4.abort, imaplib2.IMAP4.error, socket.error): | |
# Is this a new state change? | |
if self.onlineEvent.isSet(): | |
notify_mail("%s %s unavailable!" % (self.username, self.server), | |
"IMAP server login failed. System will retry.", | |
idler=self) | |
logger.warning("%s : %s DISCONNECTED. Attempting reconnect." | |
% (self.username, self.server)) | |
self.onlineEvent.clear() | |
def waiter(): | |
logger.debug("Waiter spawned and sleeping %s %s" | |
% (self.username, self.server)) | |
time.sleep(5) | |
logger.debug("Waiter requesting relogin attempt %s %s" | |
% (self.username, self.server)) | |
reidle_queue.put(self._imaplogin) | |
return | |
waitThread = threading.Thread(target=waiter) | |
waitThread.start() | |
def __del__(self): | |
# Close mailbox and shutdown | |
if self.M is not None: | |
logger.debug("About to close %s %s" % (self.username, self.server)) | |
self.M.close() | |
logger.debug("Closed %s %s" % (self.username, self.server)) | |
self.M.logout() | |
logger.info("Idler logout for %s : %s" % (self.username, self.server)) | |
# Start an idle thread | |
def idle(self): | |
try: | |
self.M.idle(callback=self.dosync) | |
logger.info("Started IDLE for %s on %s" % (self.username, self.server)) | |
except (imaplib2.IMAP4.abort, imaplib2.IMAP4.error, socket.error): | |
# If for some reason we're not logged on, then logon. Otherwise | |
# let the exception percolate up and kill the process. | |
self._imaplogin() | |
# The method that gets called when a new email arrives. | |
def dosync(self, args): | |
# Handle possible errors | |
result, arg, exc = args | |
logger.info("%s : %s" % (result[0], result[1])) | |
if result is None: | |
logger.info("Error during IDLE: %s" % str(exc)) | |
logger.info("Attempting reconnect...") | |
self._imaplogin() # Try and relogin | |
return | |
self.do_mailfetch() # Just fetch | |
reidle_queue.put(self.idle) # Requeue restart of IDLE command | |
# Call getmail or the nominated subprocess | |
def do_mailfetch(self): | |
logger.info("Getmail sync: %s" % self.getmailConfig) | |
output = "" | |
try: | |
logger.debug("Running getmail with config: %s" % self.getmailConfig) | |
output = subprocess.check_output(["getmail","-r",self.getmailConfig]) | |
logger.debug("%s" % output) | |
except subprocess.CalledProcessError, e: | |
output = e.output | |
logger.warning("Non-zero return for getmail (%s) %s" % | |
(self.getmailConfig, e.returncode)) | |
logger.debug("%s" % output) | |
logger.debug("Getmail sync complete for %s" % self.getmailConfig) | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-r", | |
help="getmail configuration file to use" + | |
" (can specify more then once)", action="append", | |
dest="getmailconfigs",metavar='GETMAILRC') | |
parser.add_argument("--pid-file", "-p", nargs=1, | |
help="pidfile to use for process limiting", action="store", | |
dest="pidfile") | |
parser.add_argument("--verbose", "-v", | |
help="set output verbosity", action="count", | |
dest="verbosity") | |
#parser.add_argument("--override-deliver", | |
# help="pidfile to use for process limiting", action="store", | |
# dest="delivercmd") | |
parser.add_argument("--daemonize", | |
help="should process daemonize?", action="store_true", | |
dest="daemonize") | |
parser.add_argument("--logfile", nargs=1, | |
help="file to redirect log output too (useful for daemon mode)", | |
action="store", dest="logfile") | |
args = parser.parse_args() | |
# Configure logging | |
logger = logging.getLogger() | |
log_formatter = logging.Formatter('%(levelname)s: %(message)s') | |
file_formatter = logging.Formatter('%(asctime)-15s: %(levelname)s: %(funcName)s : %(message)s') | |
verbosity = (40 - (args.verbosity*10)) if (40 - (args.verbosity*10)) > 0 else 0 | |
logger.setLevel(verbosity) | |
# Console logging | |
ch = logging.StreamHandler() | |
ch.setLevel(verbosity) | |
ch.setFormatter(log_formatter) | |
logger.addHandler(ch) | |
# Descriptors to preserve on daemonize | |
fd_keep = [] | |
# Maybe do file logging | |
logfile = os.path.realpath(args.logfile[0]) if (args.logfile is not None) else None | |
if logfile: | |
fh = logging.handlers.RotatingFileHandler(logfile, maxBytes=1048576, | |
backupCount=3) | |
fh.setLevel(verbosity) | |
fh.setFormatter(file_formatter) | |
logger.addHandler(fh) | |
fd_keep.append(fh.stream.fileno()) | |
pidfile = os.path.realpath(args.pidfile[0]) if len(args.pidfile) > 0 else None | |
daemonize = args.daemonize | |
getmailconfigs = [os.path.realpath(path) for path in args.getmailconfigs] | |
logger.debug("Pid File %s" % pidfile) | |
logger.debug("Daemonize: %s" % daemonize) | |
[logger.debug("Getmail Config File: %s" % getmailconfig) for getmailconfig in getmailconfigs] | |
logger.debug("Log File: %s" % logfile) | |
# Exit in error early if we somehow have no config data. | |
if len(args.getmailconfigs) == 0: | |
logger.error("No Getmail configurations specified - exiting now.") | |
sys.exit(1) | |
# Is process already running? | |
logger.debug("Pid file value is %s" % pidfile) | |
if pidfile: | |
# Was something running? | |
if os.path.isfile(pidfile): | |
# Is it still running? | |
pid = long(file(pidfile, 'r').readline()) | |
if psutil.pid_exists(pid): | |
logger.debug("Daemon already running. Silent termination.") | |
sys.exit(0) | |
else: | |
logger.debug("Found a stale pid file: %s" % pidfile) | |
# NOTE: Be quiet with logging at INFO level up to here, so we don't get cron | |
# spam. There are more sensible ways to do this of course. | |
# Check for GetMail executable usability | |
logger.info("Check for usable getmail...") | |
FNULL = open(os.devnull, 'w') | |
if subprocess.call("getmail --version", shell=True,stdout=FNULL, | |
stderr=subprocess.STDOUT) != 0: | |
logger.error("Getmail executable not available. It might not be installed.\n") | |
sys.exit(2) | |
logger.info("Getmail is usable.") | |
# Check if a pidfile was specified and warn | |
if not args.pidfile: | |
logger.warning("Running without a pidfile. Multiple executions may occur.") | |
# This is the time to daemonize if we're going | |
if daemonize: | |
try: | |
pid = os.fork() | |
if pid > 0: | |
# exit first parent | |
os._exit(0) | |
except OSError, e: | |
logger.error("fork #1 failed: %d (%s)" % (e.errno, e.strerror)) | |
sys.exit(1) | |
# decouple from parent environment | |
os.chdir("/") | |
os.setsid() | |
os.umask(0) | |
# do second fork | |
try: | |
pid = os.fork() | |
if pid > 0: | |
# exit from second parent, print write new pidfile | |
logger.info("Daemonized with PID %s" % pid) | |
logger.removeHandler(ch) # remove console handler after here | |
ch.close() | |
os._exit(0) | |
except OSError, e: | |
logger.error("fork #2 failed: %d (%s)" % (e.errno, e.strerror)) | |
sys.exit(1) | |
# At this point we are daemonized - close old file descriptors except | |
# logging | |
import resource # Resource usage logger.information. | |
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] | |
if (maxfd == resource.RLIM_INFINITY): | |
maxfd = MAXFD | |
#Iterate through and close all file descriptors. | |
for fd in range(0, maxfd): | |
try: | |
if not any(keep_fd == fd for keep_fd in fd_keep): | |
os.close(fd) | |
except OSError: # ERROR, fd wasn't open to begin with (ignored) | |
pass | |
# Redirect std streams | |
sys.stdout.flush() | |
sys.stderr.flush() | |
si = file(os.devnull, 'r') | |
so = file(os.devnull, 'a+') | |
se = file(os.devnull, 'a+', 0) | |
os.dup2(si.fileno(), sys.stdin.fileno()) | |
os.dup2(so.fileno(), sys.stdout.fileno()) | |
os.dup2(se.fileno(), sys.stderr.fileno()) | |
# Write new pid file | |
file(pidfile, 'w').write(str(os.getpid())) | |
# signal handler | |
def shutdown(sig, frame): | |
logger.info("Caught signal %s" % sig) | |
start_shutdown.set() | |
# Handle signals | |
for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGQUIT, signal.SIGABRT, | |
signal.SIGTERM): | |
signal.signal(sig, shutdown) | |
reidle_queue = Queue.Queue() # Queue for idlers requesting reset of IDLE | |
# Sets up, IDLEs and tears down workers | |
def workerhandler(): | |
idlers = [] # Hold reference to idlers in main thread to avoid GC | |
# Otherwise queue get/put can destroy them. | |
# Setup idlers | |
try: | |
for configfile in getmailconfigs: | |
idler = Idler(configfile, reidle_queue) | |
idlers.append(idler) | |
# Process queue | |
while True: | |
obj = reidle_queue.get() | |
if type(obj) == str: | |
logger.debug("Received termination signal.") | |
break | |
obj() # should be a function | |
reidle_queue.task_done() | |
reidle_queue.task_done() | |
except: | |
tb = traceback.format_exc() | |
logger.error("%s" % tb) | |
start_shutdown.set() # Main thread should die. | |
# We're about to end this thread, but the main thread is sleeping so | |
# send a signal to ourselves to wake it up die. | |
os.kill(os.getpid(), signal.SIGINT) | |
return | |
workerhandler_thread = threading.Thread(target=workerhandler) | |
# Create an idler for each getmail config file we received that uses IMAP | |
# We will die if any of these don't work, since getting our mail is kind of | |
# important. | |
try: | |
workerhandler_thread.start() | |
# Handle signals | |
while True: | |
signal.pause() | |
if start_shutdown.is_set(): | |
break | |
except Exception, e: | |
tb = traceback.format_exc() | |
logger.error("%s" % tb) | |
finally: | |
# final dispensation | |
logger.info("Queuing IDLE worker shutdown...") | |
reidle_queue.put(CONST_shutdown) | |
workerhandler_thread.join() | |
logger.info("IDLE worker joined.") | |
# Dispatch a shutdown notification | |
notify_mail("Daemon shutting down", traceback.format_exc()) | |
if pidfile: | |
try: | |
os.unlink(pidfile) | |
logger.debug("pidfile deleted.") | |
except OSError: | |
logger.warning("pidfile was already deleted?") | |
logger.info("Process exiting.") | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment