Skip to content

Instantly share code, notes, and snippets.

@mjms3
Last active September 18, 2017 22:34
Show Gist options
  • Save mjms3/4a2ca1e645f962fece1d82cecf3c2e37 to your computer and use it in GitHub Desktop.
Save mjms3/4a2ca1e645f962fece1d82cecf3c2e37 to your computer and use it in GitHub Desktop.
Multiprocess logging in python
import logging
import logging.config
import logging.handlers
import multiprocessing
import random
import threading
import time
from functools import partial
from traceback import format_exc
# Shamelessly building on: https://pymotw.com/2/multiprocessing/communication.html
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue, log_queue, stop=None):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.log_queue = log_queue
self.stop = stop
def run(self):
logging.root.handlers = []
qh = logging.handlers.QueueHandler(self.log_queue)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
logger = logging.getLogger(self.name)
while True:
next_task = self.task_queue.get()
if next_task is None:
logger.info('Task in %s is None so stopping consumer', self.name)
self.task_queue.task_done()
break
elif self.stop is not None and self.stop.is_set():
logger.info('Not executing task %s in queue %s due to global stop', next_task.args, self.name)
self.task_queue.task_done()
self.result_queue.put(None)
else:
try:
logger.info('Starting task in queue: %s, %s', self.name, next_task.args)
answer = next_task(logger)
except Exception as ex:
logger.error("Task %s failed with exception:\n", next_task.args, exc_info=True)
msg = "{}\n\nOriginal:{}".format(ex, format_exc())
answer = Exception(msg)
if self.stop is not None:
self.stop.set()
finally:
self.task_queue.task_done()
self.result_queue.put(answer)
return
class LoggingSafeExecutor(object):
def __init__(self, num_procs, raise_errors=True, stop_on_first_error=True):
self.num_procs = num_procs
self._num_jobs = 0
self.raise_errors = raise_errors
self.stop_on_first_error = stop_on_first_error
def __enter__(self):
self._logging_queue = multiprocessing.Queue()
self._tasks = multiprocessing.JoinableQueue()
self._results = multiprocessing.Queue()
if self.stop_on_first_error:
self._stop = multiprocessing.Event()
else:
self._stop = None
self._lp = threading.Thread(target=self.logger_thread)
self._lp.start()
self._consumers = [Consumer(self._tasks, self._results, self._logging_queue, self._stop) for _ in
range(self.num_procs)]
for w in self._consumers:
w.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
for _ in range(self.num_procs):
self._tasks.put(None)
if self._stop and self._stop.is_set():
self._tasks.terminate()
self._tasks.join()
for consumer in self._consumers:
consumer.join()
self._logging_queue.put(None)
self._lp.join()
self.results = [self._results.get() for _ in range(self._num_jobs)]
if self.raise_errors:
errors = [r for r in self.results if isinstance(r, Exception)]
if errors:
msg = "The following exceptions were encountered:\n"
msg += '\n'.join([str(r) for r in errors])
raise Exception(msg)
def logger_thread(self):
while True:
record = self._logging_queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def add_job(self, task, args):
self._tasks.put(partial(task,args))
self._num_jobs += 1
def job(arg,logger):
total_slept_time = 0
for i in range(2):
sleep_time = random.random()
total_slept_time += sleep_time
logger.info('Args: %s', repr(arg))
logger.info('Sleeping for: %s, total slept time:%s', sleep_time, total_slept_time)
if sleep_time > 0.8:
raise Exception('Sleep time=%s' % sleep_time)
time.sleep(sleep_time)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('')
start = time.time()
logger.info('Start time: %s', start)
with LoggingSafeExecutor(num_procs=5) as ex:
for i in range(10):
ex.add_job(job, i)
print(ex.results)
end = time.time()
logger.info('Elapsed time: %s', end - start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment