Created
February 9, 2011 07:47
-
-
Save pcdinh/818114 to your computer and use it in GitHub Desktop.
Multiple process logging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
from logging import basicConfig, getLogger, Formatter, StreamHandler | |
from logging.handlers import RotatingFileHandler | |
import multiprocessing, threading, logging, sys, traceback | |
class PipeLogHandler(logging.Handler): | |
"""This handler helps feed log message from child processes to the parent process via a pipe""" | |
shutdown = False | |
queue = None | |
def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=0, child=False): | |
logging.Handler.__init__(self) | |
self._handler = RotatingFileHandler(filename, mode=mode, maxBytes=maxBytes, backupCount=backupCount) | |
self.queue = multiprocessing.Queue(-1) | |
# we only want one of the loggers to be pulling from the queue. | |
# If there is a way to do this without needing to be passed this | |
# information, that would be great! | |
if child == False: | |
self.shutdown = False | |
self.polltime = 1 | |
t = threading.Thread(target=self.receive) | |
t.daemon = True | |
t.start() | |
def setFormatter(self, fmt): | |
logging.Handler.setFormatter(self, fmt) | |
self._handler.setFormatter(fmt) | |
def receive(self): | |
while (self.shutdown == False) or (self.queue.empty() == False): | |
try: | |
record = self.queue.get() | |
self._handler.emit(record) | |
except (KeyboardInterrupt, SystemExit): | |
raise | |
except EOFError: | |
break | |
except BaseException, e: | |
traceback.print_exc(file=sys.stderr) | |
time.sleep(0.1) | |
def send(self, s): | |
# self.queue.put_nowait(s) | |
# send just puts it in the queue for the server to retrieve | |
self.queue.put(s) | |
def _format_record(self, record): | |
# ensure that exc_info and args | |
# have been stringified. Removes any chance of | |
# unpickleable things inside and possibly reduces | |
# message size sent over the pipe | |
if record.args: | |
record.msg = record.msg % record.args | |
record.args = None | |
if record.exc_info: | |
dummy = self.format(record) | |
record.exc_info = None | |
return record | |
def emit(self, record): | |
try: | |
s = self._format_record(record) | |
self.send(s) | |
except (KeyboardInterrupt, SystemExit): | |
raise | |
except EOFError: | |
raise | |
except: | |
self.handleError(record) | |
def close(self): | |
time.sleep(self.polltime+1) # give some time for messages to enter the queue. | |
self.shutdown = True | |
self._handler.close() | |
logging.Handler.close(self) | |
def __del__(self): | |
self.close() # hopefully this aids in orderly shutdown when things are going poorly. | |
def setup_logger(name=None, logdir=None, filename=None, level=None, maxBytes=0): | |
"""Gets a logger""" | |
logdir = os.path.abspath(logdir) | |
if not os.path.exists(logdir): | |
os.mkdir(logdir) | |
log = getLogger(name) | |
log.setLevel(level) | |
log_formatter = Formatter("%(asctime)s - %(message)s") | |
pipe_handler = PipeLogHandler(os.path.join(logdir, filename), maxBytes=maxBytes, backupCount=5) | |
pipe_handler.setFormatter(log_formatter) | |
log.addHandler(pipe_handler) | |
log.info("Logger %s initialized" % name) | |
return log | |
def test_child_process(): | |
logger = setup_logger("parent", "/tmp", "parent.log", logging.DEBUG, 1024*1024) | |
logger.info("Child process ID %s" % os.getpid()) | |
logger.info("Child process ID %s" % os.getpid()) | |
print "Child process ID %s" % os.getpid() | |
if __name__ == '__main__': | |
logger = setup_logger("parent", "/tmp", "parent.log", logging.DEBUG, 1024*1024) | |
try: | |
pids = [] | |
for i in range(1, 5): | |
pid = os.fork() | |
if pid: | |
# parent | |
print "Parent PID %s" % os.getpid() | |
pids.append(pid) | |
logger.info("Parent process gets started") | |
else: | |
# child | |
print os.getpid() | |
test_child_process() | |
time.sleep(0.5) | |
print "Child process PID %s will exits now" % os.getpid() | |
os._exit(0) | |
except BaseException, e: | |
print str(e) | |
traceback.print_exc(file=sys.stdout) | |
try: | |
while pids: | |
try: | |
spid, status = os.waitpid(0, os.WNOHANG) | |
if spid > 0: | |
print "The child process %s exits" % spid | |
pids.remove(spid) | |
continue | |
print "No child process exits" | |
except BaseException, e: | |
print str(e) | |
finally: | |
time.sleep(0.1) | |
finally: | |
print pids | |
logging.shutdown() | |
print "Parent process exits" | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment