Skip to content

Instantly share code, notes, and snippets.

@mdalp
Created February 16, 2018 12:22
Show Gist options
  • Save mdalp/7c2e9ba4fa1972f6cdd5162f46e40ad2 to your computer and use it in GitHub Desktop.
Save mdalp/7c2e9ba4fa1972f6cdd5162f46e40ad2 to your computer and use it in GitHub Desktop.
Proof of concept for a log handler that defers logs calls to an other process.
from __future__ import absolute_import, division, unicode_literals, print_function
import atexit
import cPickle
import logging
import multiprocessing
import sys
import time
import traceback
from datetime import datetime
try:
import queue
except ImportError:
# Python <3.0
import Queue as queue
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(lineno)d - %(processName)s - %(message)s'
formatter = logging.Formatter(log_format)
logging.basicConfig(format=log_format)
class MultiProcessingHandlerMixin(object):
def __init__(self, log_format, name=None, *args, **kwargs):
super(MultiProcessingHandlerMixin, self).__init__(*args, **kwargs)
name = name or self.__class__.__name__
self.queue = multiprocessing.Queue(-1)
self._is_closed = multiprocessing.Event()
# the formatter has to be set here to be shared with the child process
self.formatter = logging.Formatter(log_format)
self._handler_process= multiprocessing.Process(target=self._log_consumer, name=name)
self._handler_process.start()
atexit.register(self.close)
def _log_consumer(self):
while not (self._is_closed.is_set() and self.queue.empty()):
try:
record = self.queue.get(timeout=0.2)
# record.extra = {'qsize': self.queue.qsize()}
record.qsize = self.queue.qsize()
super(MultiProcessingHandlerMixin, self).emit(record)
except EOFError:
break
except queue.Empty:
pass # This periodically checks if the logger is closed.
except:
traceback.print_exc(file=sys.stderr)
self.queue.close()
self.queue.join_thread()
@staticmethod
def _make_record_pickle_safe(record):
"""Clean record args to make them picklable.
To send data to an other process, pickle is used under the hood.
Not everything is picklabe (e.g.: lambda functions), in case something is
not picklable we log the string representation.
It's a bit hacky but it does the job.
"""
args = list(record.args)
for i, arg in enumerate(args):
try:
cPickle.dumps(arg)
except (cPickle.PicklingError, TypeError):
logging.exception('Pickling error while sending data to log processor, data will be stringified.')
args[i] = str(arg)
record.args = tuple(args)
return record
def emit(self, record):
"""Put the record in a queue that gets processed by an other thread."""
record = self._make_record_pickle_safe(record)
self.queue.put_nowait(record)
def close(self):
if not self._is_closed.is_set():
self._is_closed.set()
self._handler_process.join(5.0) # Waits for receive queue to empty.
super(MultiProcessingHandlerMixin, self).close()
class SlowStreamHandler(logging.FileHandler):
def emit(self, record):
time.sleep(.5)
super(SlowStreamHandler, self).emit(record)
class AsyncStreamHandler(MultiProcessingHandlerMixin, SlowStreamHandler):
pass
def configure_loggers():
async_logger = logging.getLogger('async')
async_format = '%(asctime)s - %(name)s - %(levelname)s - %(lineno)d - %(processName)s[%(qsize)s] - %(message)s'
handler = AsyncStreamHandler(log_format=async_format, name='mp-handler', filename='/tmp/temp.log')
handler.setLevel(logging.DEBUG)
async_logger.addHandler(handler)
async_logger.setLevel(logging.DEBUG)
async_logger.propagate = False
normal_logger = logging.getLogger('sync')
handler = logging.FileHandler(filename='/tmp/temp.log')
handler.setFormatter(formatter)
normal_logger.addHandler(handler)
normal_logger.setLevel(logging.DEBUG)
if __name__ == '__main__':
configure_loggers()
async_logger = logging.getLogger('async')
normal_logger = logging.getLogger('sync')
for i in range(3):
normal_logger.info('This is a normal logger that runs along side the async one.')
async_logger.debug('Async log %d at time %s', i, datetime.today())
async_logger.debug('This is a log with an unpicklable object %s', lambda: 1)
sys.stdout.write('This should be printed with no wait.\n')
sys.stdout.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment