Created
August 4, 2021 23:57
-
-
Save gwerbin/e9ab7a88fef03771ab0bf3a11cf921bc to your computer and use it in GitHub Desktop.
Demo of logging in a multi-processing context.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Demo of logging in a multi-processing context. | |
Based on the Python "Logging Cookbook": | |
https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes | |
Note on `QueueListener`: | |
The main difference between the Cookbook example and this demo is that I use | |
the `QueueListener` convenience class, whereas they write a bespoke | |
`listener_process` function which does more or less the same thing as | |
QueueListener. | |
I put this demo together (in part) because there is no `QueueListener` | |
example usage in the docs. If you have specific needs that are not met by | |
QueueListener, you can subclass it, or write your own from scratch as in the | |
Logging Cookbook example. | |
Subclassing `QueueListener` to customize its behavior, or writing your own | |
from scratch, should not be difficult. The stdlib implementation is not | |
complicated and should be easy to modify. The source is here: | |
https://github.com/python/cpython/blob/v3.9.6/Lib/logging/handlers.py#L1436-L1543 | |
Possible problems with QueueListener include: | |
* It runs in its own background thread; you might want to run it in a | |
separate process, or something else. | |
* The `QueueListener` uses `Handler` objects to handle log messages, | |
without going through a `Logger`. This might be a detriment if you want | |
more control over log levels, etc. | |
""" | |
import logging | |
import multiprocessing | |
import string | |
from concurrent.futures import ProcessPoolExecutor | |
from logging.handlers import QueueHandler, QueueListener | |
from random import Random | |
from time import sleep | |
from typing import Sequence | |
_logger_name = "demo" | |
def process_item(item: str) -> tuple[int, int]: | |
"""Process one item of data, CPU-intensively, with logging.""" | |
logger = logging.getLogger(_logger_name) | |
logger.info("Working on item: %r", item) | |
total = 0 | |
skipped = 0 | |
for c in item: | |
if c == "-": | |
skipped += 1 | |
logger.debug("Skipping %r, skipped = %d", c, skipped) | |
else: | |
sleep(0.25) | |
val = ord(c) | |
total += val | |
logger.debug("%r -> %d, total = %d", c, val, total) | |
logger.info("Done! total = %d, skipped = %d", total, skipped) | |
return total, skipped | |
def init_worker(log_queue: multiprocessing.Queue, log_level: int) -> None: | |
"""Initialize a worker process.""" | |
logger = logging.getLogger(_logger_name) | |
logger.setLevel(log_level) | |
handler = QueueHandler(log_queue) | |
logger.addHandler(handler) | |
def generate_fake_data() -> Sequence[str]: | |
"""Generate some fake data for the demo.""" | |
alphabet = string.ascii_letters | |
rng = Random(12345) | |
items = [] | |
for _ in range(30): | |
# Choose up to 20 letters | |
n_letters = rng.randint(1, 20) | |
letters = rng.choices(alphabet, k=n_letters) | |
# Insert up to 3 dashes | |
n_dashes = rng.randint(0, min(n_letters, 3)) | |
dash_positions = rng.sample(list(range(n_letters)), k=n_dashes) | |
for i in dash_positions: | |
letters[i] = "-" | |
# Collect into a string | |
item = "".join(letters) | |
items.append(item) | |
return items | |
def main(): | |
# -- Set up logging -- # | |
log_level = logging.INFO | |
log_format = "%(name)s:%(levelname)s:%(processName)s:%(asctime)s: %(message)s" | |
# Set up a handler and formatter as usual | |
formatter = logging.Formatter(log_format) | |
stdout_handler = logging.StreamHandler() | |
stdout_handler.setFormatter(formatter) | |
# Set up the log listener. | |
# The log_listener will loop forever (in its own thread), handling log | |
# messages as they arrive on the log_queue. See the top-level docstring for | |
# more detail on this. | |
# Note that we do not need to actually get a Logger object to run this! | |
log_queue = multiprocessing.Queue() | |
log_listener = QueueListener(log_queue, stdout_handler) | |
# -- Run the application -- # | |
# Start a background thread that listens for and handles log messages. | |
log_listener.start() | |
# Generate some data to process. | |
items = generate_fake_data() | |
# Run some worker processes that emit log messages. Note that | |
# `Executor.map` is blocking, so we don't have to explicitly "join" | |
# anything to wait for completion. | |
n_workers = 4 | |
executor = ProcessPoolExecutor( | |
n_workers, initializer=init_worker, initargs=(log_queue, log_level) | |
) | |
with executor: | |
results = executor.map(process_item, items, chunksize=len(items) // n_workers) | |
# Put a "shutdown" sentinel on the end of the logging queue. | |
# Note that this calls `.put_nowait()` on the queue, so you might need to do some | |
# error handling here, or subclass QueueListener to behave differently. | |
log_listener.enqueue_sentinel() | |
print("All done!") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment