Skip to content

Instantly share code, notes, and snippets.

@sacko87
Last active February 3, 2019 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sacko87/2b0282f5b3af0e8047f83bd584517af2 to your computer and use it in GitHub Desktop.
Save sacko87/2b0282f5b3af0e8047f83bd584517af2 to your computer and use it in GitHub Desktop.
A logger that works between multiprocessing Processes.
import logging
import os
import signal
from logging.handlers import QueueHandler, RotatingFileHandler, QueueListener
from multiprocessing import Manager
DEBUG = os.environ.get("DEBUG", False)
def wrap(f, *args, **kwargs):
"""
Logging in a multiple process environment is a pain!
This wrapper does a couple of things.
Firstly it creates an mtp manager that can create and control shared objects
between processes. The manager ignores SIGINT to allow a clean closure of the
application. We don't want queues to die because of an EOFError!
Using this manager, it then sets up a logging utility that uses a queue back end
allowing all processes to log to the same location. I'm useful like that :).
It then calls your function with the manager as a parameter, any remaining
arguments are passed along to it.
"""
try:
sigint_handler = kwargs.pop("signal_handler")
except KeyError:
# ignore SIGINT
sigint_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
# create a manager
# inherits SIGINT stopping it from closing resources during SIGINT
manager = Manager()
# reinstate the SIGINT handler for the parent
# ignoring SIGINT and reinstating it removes a race condition between
# the time it takes for the manager to start up and then ignore it itself
signal.signal(signal.SIGINT, sigint_handler)
# create the logging listener handler
try:
logging_listener_handler = kwargs.pop("logging_listener_handler")
except KeyError:
logging_listener_handler = (
RotatingFileHandler("application.log", backupCount=3)
if not DEBUG
else logging.StreamHandler()
)
logging_listener_handler.setFormatter(
logging.Formatter(
"%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s"
)
)
# ... guess what i do
logging_queue = manager.Queue(-1)
# setup the root logger to use it
queue_handler = QueueHandler(logging_queue)
root_logger = logging.getLogger()
root_logger.addHandler(queue_handler)
root_logger.setLevel(logging.DEBUG)
# create and start the logging listener
logging_listener = QueueListener(logging_queue, logging_listener_handler)
logging_listener.start()
try:
# call f with the manager in tow
# i'm nice and i'd like you to use the same process
f(manager=manager, *args, **kwargs)
except KeyboardInterrupt:
logging.warning("Exiting, we hope you've cleaned up...")
except Exception:
import sys
import traceback
traceback.print_exc(file=sys.stderr)
# clean up the listener
logging_queue.put_nowait(None)
logging_listener.stop()
manager.shutdown()
if __name__ == "__main__":
def main(*args, **kwargs):
logging.info("This is a test.")
wrap(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment