Skip to content

Instantly share code, notes, and snippets.

@impredicative
Created May 26, 2020 13:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save impredicative/009837e475402403dac39fc2a82f8549 to your computer and use it in GitHub Desktop.
Save impredicative/009837e475402403dac39fc2a82f8549 to your computer and use it in GitHub Desktop.
BufferingSMTPHandler
import datetime, logging, smtplib, threading, time, Queue
class BufferingSMTPHandler(logging.Handler):
"""Set up a buffering SMTP logging handler."""
# Configurable parameters
_POLL_INTERVAL = 5 # Interval between checks for sets of new records.
_POLL_DURATION_MAX = 10 # If a record is available, max time to continue
# polling for more records.
_SEND_INTERVAL = 2 * 60 # Interval between sends.
# Setup class environment
_Q = Queue.Queue()
_LOCK = threading.Lock()
_LAST_SEND_TIME = float('-inf')
def state(self):
"""Return a dict containing keys and values providing information about
the state of the handler."""
#time_now = time.time()
time_now = datetime.datetime.utcnow()
# Calculate time since last email
if self._LAST_SEND_TIME != float('-inf'):
time_of_last = datetime.datetime.utcfromtimestamp(self._LAST_SEND_TIME)
time_since_last = time_now - time_of_last
else:
time_since_last = '(none sent yet)'
# Time to next earliest possible email
if self._LAST_SEND_TIME != float('-inf'):
time_of_next = datetime.datetime.utcfromtimestamp(self._LAST_SEND_TIME+self._SEND_INTERVAL)
time_of_next = max(time_now, time_of_next)
time_until_next = time_of_next - time_now
else:
time_until_next = time_now - time_now
return {'Total number of unprocessed errors': self._Q.qsize() + self._q.qsize(),
'Intervals': 'Poll: {}s, Send: {}s'.format(self._POLL_INTERVAL, self._SEND_INTERVAL),
'Poll duration max': '{}s'.format(self._POLL_DURATION_MAX),
'Time since last email': time_since_last,
'Time to next earliest possible email': 'at least {}'.format(time_until_next),
# This simplification doesn't account for _POLL_INTERVAL and _POLL_DURATION_MAX, etc.
'Recipients': self._header['toaddrs_str'],
}
def __init__(self, fromaddr, toaddrs, subject):
# Setup instance environment
self._active = True
self._q = Queue.Queue() # this is different from self._Q
# Construct email header
self._header = {'fromaddr': fromaddr,
'toaddrs': toaddrs,
'toaddrs_str': ','.join(toaddrs),
'subject': subject,
}
self._header['header'] = 'From: {fromaddr}\r\nTo: {toaddrs_str}\r\nSubject: {subject}\r\n\r\n'.format(**self._header)
# Start main buffer-processor thread
thread_name = '{}Thread'.format(self.__class__.__name__)
# Note: The class is intentionally not inherited from threading.Thread,
# as doing so was found to result in the target thread not being
# named correctly, possibly due to a namespace collision.
thread = threading.Thread(target=self.run, name=thread_name)
thread.daemon = True
thread.start()
super(BufferingSMTPHandler, self).__init__()
def close(self):
"""Process some remaining records."""
super(BufferingSMTPHandler, self).close()
self._active = False
self._POLL_DURATION_MAX = min(0.25, self._POLL_DURATION_MAX)
# no need to set self.__class__._POLL_DURATION_MAX
self._process_recordset()
def emit(self, record):
"""Queue a record into the class queue so it can be emitted
collectively."""
# This method can be called by various threads.
self._Q.put(self.format(record))
def run(self):
"""Periodically flush the buffer."""
while self._active:
with self._LOCK: # protects _LAST_SEND_TIME and _q
next_send_time = self._LAST_SEND_TIME + self._SEND_INTERVAL
if time.time() > next_send_time:
self._process_recordset()
sleep_time = self._POLL_INTERVAL
else:
# assert (next_send_time != -inf)
sleep_time = max(next_send_time - time.time(), 0)
time.sleep(sleep_time)
def _process_recordset(self):
"""Process a set of records buffered in class queue."""
try:
self._move_recordset_from_Q_to_q()
if not self._q.empty():
self._send_records_from_q()
self.__class__._LAST_SEND_TIME = time.time()
except (KeyboardInterrupt, SystemExit):
pass
def _move_recordset_from_Q_to_q(self):
"""Move a set of records from class queue to instance queue."""
deadline = time.time() + self._POLL_DURATION_MAX
while time.time() < deadline:
try:
self._q.put(self._Q.get_nowait())
self._Q.task_done()
except Queue.Empty:
if self._q.empty():
break
time.sleep(0.1)
def _send_records_from_q(self):
"""Send records that are in instance queue."""
records = []
try:
# Get formatted records from instance queue
while True:
records.append(self._q.get_nowait())
self._q.task_done()
except (Queue.Empty, KeyboardInterrupt, SystemExit):
pass
finally:
# Send formatted records from instance queue
if records:
body = 'Included messages: {}\r\n'.format(len(records))
num_pending_messages = self._Q.qsize() + self._q.qsize()
if num_pending_messages > 0:
body += 'Pending messages: {}\r\n'.format(num_pending_messages)
# Add main content of message body
body += '\r\n'
body += '\r\n\r\n'.join(records)
msg = self._header['header'] + body
smtp = smtplib.SMTP()
smtp.connect()
smtp.sendmail(self._header['fromaddr'], self._header['toaddrs'], msg)
smtp.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment