Skip to content

Instantly share code, notes, and snippets.

@Jerry0420
Last active June 17, 2023 09:41
Show Gist options
  • Save Jerry0420/86d3073fbacc95ca14ecf4815a9b1d1f to your computer and use it in GitHub Desktop.
Save Jerry0420/86d3073fbacc95ca14ecf4815a9b1d1f to your computer and use it in GitHub Desktop.
An logger tool written in Python.
  • Thread-Safe between multiple processes.
  • No need for any dependency.
import logging
import logging.handlers
import multiprocessing
import os
import sys
from pathlib import Path
class MultiProcesses_Logger_Util:
def __init__(self) -> None:
self.queue = multiprocessing.Manager().Queue(-1)
self.process = multiprocessing.get_context('fork').Process(
target=self._process_target,
)
self.process.start()
self.redirect_main_process_log_to_queue()
def get_log_file_path(self) -> str:
# TODO: make this configurable
log_file_dir = os.path.join(Path(__file__).parent, 'logs')
Path(log_file_dir).mkdir(parents=True, exist_ok=True)
log_file_path = os.path.join(log_file_dir, 'multip.log')
return log_file_path
def redirect_main_process_log_to_queue(self):
main_process_id = os.getpid()
def filter(record: logging.LogRecord):
if record.process == main_process_id:
self.queue.put_nowait(record)
return None
root = logging.getLogger()
root.setLevel(logging.INFO)
handler = logging.Handler()
handler.addFilter(filter)
root.addHandler(handler)
def init_logger_configure(self):
root = logging.getLogger()
root.setLevel(logging.INFO)
# TODO: make this configurable
file_handler = logging.handlers.RotatingFileHandler(
self.get_log_file_path(),
mode='w',
# maxBytes=5000,
# backupCount=0,
encoding='utf-8'
)
file_handler.setLevel(logging.INFO)
file_handler_formatter = logging.Formatter(
fmt='%(asctime)s | %(name)s | %(levelname)s | %(pathname)s:%(lineno)d | %(process)d | %(message)s',
datefmt="%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_handler_formatter)
root.addHandler(file_handler)
std_handler = logging.StreamHandler(sys.stdout)
std_handler.setLevel(logging.DEBUG)
std_handler_formatter = logging.Formatter(
fmt='%(asctime)s | %(name)s | %(levelname)s | %(pathname)s:%(lineno)d | %(process)d | %(message)s',
datefmt="%m-%d %H:%M:%S"
)
std_handler.setFormatter(std_handler_formatter)
root.addHandler(std_handler)
def _process_target(self):
self.init_logger_configure()
while True:
record: logging.LogRecord = self.queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def close(self):
self.process.join(timeout=0)
self.process.terminate()
import logging
import logging.handlers
import multiprocessing
from functools import partial
from logger_util import MultiProcesses_Logger_Util
def process_target(queue: multiprocessing.Queue, input: int):
# add queue handler at the beginning of the process
sub_logger = logging.getLogger('sub')
sub_logger.setLevel(logging.INFO)
queue_handler = logging.handlers.QueueHandler(queue)
# this process may be reused multiple times (when multiprocessing pool)
if len(sub_logger.handlers) == 0 or not isinstance(sub_logger.handlers[0], logging.handlers.QueueHandler):
sub_logger.addHandler(queue_handler)
sub_logger.info(input)
def main():
logger_util = MultiProcesses_Logger_Util()
pool = multiprocessing.Pool(2)
inputs = [i for i in range(5)]
pool.map(partial(process_target, logger_util.queue), inputs)
logger_util.queue.put_nowait(None)
pool.close()
pool.join()
# workers = []
# for i in range(5):
# worker = multiprocessing.get_context('fork').Process(target=process_target,
# args=(logger_util.queue, i))
# workers.append(worker)
# worker.start()
# for w in workers:
# w.join()
# logger_util.queue.put_nowait(None)
logger_util.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment