Created
February 16, 2018 12:22
-
-
Save mdalp/7c2e9ba4fa1972f6cdd5162f46e40ad2 to your computer and use it in GitHub Desktop.
Proof of concept for a log handler that defers logs calls to an other process.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import, division, unicode_literals, print_function | |
import atexit | |
import cPickle | |
import logging | |
import multiprocessing | |
import sys | |
import time | |
import traceback | |
from datetime import datetime | |
try: | |
import queue | |
except ImportError: | |
# Python <3.0 | |
import Queue as queue | |
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(lineno)d - %(processName)s - %(message)s' | |
formatter = logging.Formatter(log_format) | |
logging.basicConfig(format=log_format) | |
class MultiProcessingHandlerMixin(object): | |
def __init__(self, log_format, name=None, *args, **kwargs): | |
super(MultiProcessingHandlerMixin, self).__init__(*args, **kwargs) | |
name = name or self.__class__.__name__ | |
self.queue = multiprocessing.Queue(-1) | |
self._is_closed = multiprocessing.Event() | |
# the formatter has to be set here to be shared with the child process | |
self.formatter = logging.Formatter(log_format) | |
self._handler_process= multiprocessing.Process(target=self._log_consumer, name=name) | |
self._handler_process.start() | |
atexit.register(self.close) | |
def _log_consumer(self): | |
while not (self._is_closed.is_set() and self.queue.empty()): | |
try: | |
record = self.queue.get(timeout=0.2) | |
# record.extra = {'qsize': self.queue.qsize()} | |
record.qsize = self.queue.qsize() | |
super(MultiProcessingHandlerMixin, self).emit(record) | |
except EOFError: | |
break | |
except queue.Empty: | |
pass # This periodically checks if the logger is closed. | |
except: | |
traceback.print_exc(file=sys.stderr) | |
self.queue.close() | |
self.queue.join_thread() | |
@staticmethod | |
def _make_record_pickle_safe(record): | |
"""Clean record args to make them picklable. | |
To send data to an other process, pickle is used under the hood. | |
Not everything is picklabe (e.g.: lambda functions), in case something is | |
not picklable we log the string representation. | |
It's a bit hacky but it does the job. | |
""" | |
args = list(record.args) | |
for i, arg in enumerate(args): | |
try: | |
cPickle.dumps(arg) | |
except (cPickle.PicklingError, TypeError): | |
logging.exception('Pickling error while sending data to log processor, data will be stringified.') | |
args[i] = str(arg) | |
record.args = tuple(args) | |
return record | |
def emit(self, record): | |
"""Put the record in a queue that gets processed by an other thread.""" | |
record = self._make_record_pickle_safe(record) | |
self.queue.put_nowait(record) | |
def close(self): | |
if not self._is_closed.is_set(): | |
self._is_closed.set() | |
self._handler_process.join(5.0) # Waits for receive queue to empty. | |
super(MultiProcessingHandlerMixin, self).close() | |
class SlowStreamHandler(logging.FileHandler): | |
def emit(self, record): | |
time.sleep(.5) | |
super(SlowStreamHandler, self).emit(record) | |
class AsyncStreamHandler(MultiProcessingHandlerMixin, SlowStreamHandler): | |
pass | |
def configure_loggers(): | |
async_logger = logging.getLogger('async') | |
async_format = '%(asctime)s - %(name)s - %(levelname)s - %(lineno)d - %(processName)s[%(qsize)s] - %(message)s' | |
handler = AsyncStreamHandler(log_format=async_format, name='mp-handler', filename='/tmp/temp.log') | |
handler.setLevel(logging.DEBUG) | |
async_logger.addHandler(handler) | |
async_logger.setLevel(logging.DEBUG) | |
async_logger.propagate = False | |
normal_logger = logging.getLogger('sync') | |
handler = logging.FileHandler(filename='/tmp/temp.log') | |
handler.setFormatter(formatter) | |
normal_logger.addHandler(handler) | |
normal_logger.setLevel(logging.DEBUG) | |
if __name__ == '__main__': | |
configure_loggers() | |
async_logger = logging.getLogger('async') | |
normal_logger = logging.getLogger('sync') | |
for i in range(3): | |
normal_logger.info('This is a normal logger that runs along side the async one.') | |
async_logger.debug('Async log %d at time %s', i, datetime.today()) | |
async_logger.debug('This is a log with an unpicklable object %s', lambda: 1) | |
sys.stdout.write('This should be printed with no wait.\n') | |
sys.stdout.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment