Skip to content

Instantly share code, notes, and snippets.

@gwerbin
Created August 4, 2021 23:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gwerbin/e9ab7a88fef03771ab0bf3a11cf921bc to your computer and use it in GitHub Desktop.
Save gwerbin/e9ab7a88fef03771ab0bf3a11cf921bc to your computer and use it in GitHub Desktop.
Demo of logging in a multi-processing context.
"""Demo of logging in a multi-processing context.
Based on the Python "Logging Cookbook":
https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
Note on `QueueListener`:
The main difference between the Cookbook example and this demo is that I use
the `QueueListener` convenience class, whereas they write a bespoke
`listener_process` function which does more or less the same thing as
QueueListener.
I put this demo together (in part) because there is no `QueueListener`
example usage in the docs. If you have specific needs that are not met by
QueueListener, you can subclass it, or write your own from scratch as in the
Logging Cookbook example.
Subclassing `QueueListener` to customize its behavior, or writing your own
from scratch, should not be difficult. The stdlib implementation is not
complicated and should be easy to modify. The source is here:
https://github.com/python/cpython/blob/v3.9.6/Lib/logging/handlers.py#L1436-L1543
Possible problems with QueueListener include:
* It runs in its own background thread; you might want to run it in a
separate process, or something else.
* The `QueueListener` uses `Handler` objects to handle log messages,
without going through a `Logger`. This might be a detriment if you want
more control over log levels, etc.
"""
import logging
import multiprocessing
import string
from concurrent.futures import ProcessPoolExecutor
from logging.handlers import QueueHandler, QueueListener
from random import Random
from time import sleep
from typing import Sequence
_logger_name = "demo"
def process_item(item: str) -> tuple[int, int]:
"""Process one item of data, CPU-intensively, with logging."""
logger = logging.getLogger(_logger_name)
logger.info("Working on item: %r", item)
total = 0
skipped = 0
for c in item:
if c == "-":
skipped += 1
logger.debug("Skipping %r, skipped = %d", c, skipped)
else:
sleep(0.25)
val = ord(c)
total += val
logger.debug("%r -> %d, total = %d", c, val, total)
logger.info("Done! total = %d, skipped = %d", total, skipped)
return total, skipped
def init_worker(log_queue: multiprocessing.Queue, log_level: int) -> None:
"""Initialize a worker process."""
logger = logging.getLogger(_logger_name)
logger.setLevel(log_level)
handler = QueueHandler(log_queue)
logger.addHandler(handler)
def generate_fake_data() -> Sequence[str]:
"""Generate some fake data for the demo."""
alphabet = string.ascii_letters
rng = Random(12345)
items = []
for _ in range(30):
# Choose up to 20 letters
n_letters = rng.randint(1, 20)
letters = rng.choices(alphabet, k=n_letters)
# Insert up to 3 dashes
n_dashes = rng.randint(0, min(n_letters, 3))
dash_positions = rng.sample(list(range(n_letters)), k=n_dashes)
for i in dash_positions:
letters[i] = "-"
# Collect into a string
item = "".join(letters)
items.append(item)
return items
def main():
# -- Set up logging -- #
log_level = logging.INFO
log_format = "%(name)s:%(levelname)s:%(processName)s:%(asctime)s: %(message)s"
# Set up a handler and formatter as usual
formatter = logging.Formatter(log_format)
stdout_handler = logging.StreamHandler()
stdout_handler.setFormatter(formatter)
# Set up the log listener.
# The log_listener will loop forever (in its own thread), handling log
# messages as they arrive on the log_queue. See the top-level docstring for
# more detail on this.
# Note that we do not need to actually get a Logger object to run this!
log_queue = multiprocessing.Queue()
log_listener = QueueListener(log_queue, stdout_handler)
# -- Run the application -- #
# Start a background thread that listens for and handles log messages.
log_listener.start()
# Generate some data to process.
items = generate_fake_data()
# Run some worker processes that emit log messages. Note that
# `Executor.map` is blocking, so we don't have to explicitly "join"
# anything to wait for completion.
n_workers = 4
executor = ProcessPoolExecutor(
n_workers, initializer=init_worker, initargs=(log_queue, log_level)
)
with executor:
results = executor.map(process_item, items, chunksize=len(items) // n_workers)
# Put a "shutdown" sentinel on the end of the logging queue.
# Note that this calls `.put_nowait()` on the queue, so you might need to do some
# error handling here, or subclass QueueListener to behave differently.
log_listener.enqueue_sentinel()
print("All done!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment