Last active Feb 3, 2019
A logger that works between multiprocessing Processes.
import logging
import os
import signal
from logging.handlers import QueueHandler, RotatingFileHandler, QueueListener
from multiprocessing import Manager
DEBUG = os.environ.get("DEBUG", False)
def wrap(f, *args, **kwargs):
Logging in a multiple process environment is a pain!
This wrapper does a couple of things.
Firstly it creates an mtp manager that can create and control shared objects
between processes. The manager ignores SIGINT to allow a clean closure of the
application. We don't want queues to die because of an EOFError!
Using this manager, it then sets up a logging utility that uses a queue back end
allowing all processes to log to the same location. I'm useful like that :).
It then calls your function with the manager as a parameter, any remaining
arguments are passed along to it.
sigint_handler = kwargs.pop("signal_handler")
except KeyError:
# ignore SIGINT
sigint_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
# create a manager
# inherits SIGINT stopping it from closing resources during SIGINT
manager = Manager()
# reinstate the SIGINT handler for the parent
# ignoring SIGINT and reinstating it removes a race condition between
# the time it takes for the manager to start up and then ignore it itself
signal.signal(signal.SIGINT, sigint_handler)
# create the logging listener handler
logging_listener_handler = kwargs.pop("logging_listener_handler")
except KeyError:
logging_listener_handler = (
RotatingFileHandler("application.log", backupCount=3)
if not DEBUG
else logging.StreamHandler()
"%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s"
# ... guess what i do
logging_queue = manager.Queue(-1)
# setup the root logger to use it
queue_handler = QueueHandler(logging_queue)
root_logger = logging.getLogger()
# create and start the logging listener
logging_listener = QueueListener(logging_queue, logging_listener_handler)
# call f with the manager in tow
# i'm nice and i'd like you to use the same process
f(manager=manager, *args, **kwargs)
except KeyboardInterrupt:
logging.warning("Exiting, we hope you've cleaned up...")
except Exception:
import sys
import traceback
# clean up the listener
if __name__ == "__main__":
def main(*args, **kwargs):"This is a test.")
