Created
January 20, 2015 01:51
-
-
Save NelsonMinar/022794b6a709ea5b7682 to your computer and use it in GitHub Desktop.
Python bug with logging and multiprocessing with maxtasksperchild=1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Demo of a bug with multiprocessing.Pool, logging, and maxtasksperchild=1. | |
This program uses Pool.imap_unordered() to execute 200 tasks. Each worker | |
task writes a log message and sleeps a short time. The master process | |
uses a timeout on next() to log a status message occasionally. | |
When it works, 200 jobs are completed quickly. When it breaks, roughly | |
195 of 200 jobs will have completed and next() never raises StopIteration. | |
If everything logs to logging.getLogger() and maxtasksperchild=1, it | |
usually breaks. It appears that sometimes jobs just get lost and don't | |
complete. We've observed that with maxtasksperchild=1 sometimes a new | |
worker process gets created but no work assigned to it. When that happens | |
the task queue never runs to completion. | |
If we log straight to stderr or don't set maxtasksperchild, the run | |
completes. | |
The bug has been observed in Python 2.7.6 and Python 3.4.0 on Ubuntu 14.04 | |
This is a distillation of much more complex application-specific code. | |
Discussion of the bug and original code can be found at | |
https://github.com/openaddresses/machine/issues/51 | |
https://github.com/openaddresses/machine/blob/7c3d0fba8ba0915af2101ace45dfaf5519d5ad85/openaddr/jobs.py | |
Nelson Minar <nelson@monkey.org> January 19, 2015 | |
''' | |
import logging, multiprocessing, time, os, sys, random | |
### This pair of settings makes the run break | |
maxtasksperchild = 1 | |
def log(msg, *args): logging.getLogger().info(msg, *args) | |
### Using either of these, it will work | |
# maxtasksperchild = None | |
# def log(msg, *args): sys.stderr.write((msg + '\n') % args) | |
def run_all_tasks(tasks): | |
count = 0 | |
pool = multiprocessing.Pool(processes = 8, | |
maxtasksperchild = maxtasksperchild) | |
result_iter = pool.imap_unordered(fake_work, tasks, chunksize=1) | |
while True: | |
try: | |
result_iter.next(timeout=1) | |
count += 1 | |
except multiprocessing.TimeoutError: | |
pass # finally clause prints status | |
except KeyboardInterrupt: | |
raise | |
except StopIteration: | |
log("All jobs complete!") | |
pool.close() | |
return count | |
finally: | |
log("Job completion: %d/%d", count, len(tasks)) | |
raise Exception("Job queue exited in an odd manner.") | |
def fake_work(duration): | |
log("Sleeping for %.2f" % duration) | |
time.sleep(duration) | |
def main(): | |
logging.basicConfig(level=logging.DEBUG) | |
tasks = [random.uniform(0.01, 0.1) for n in range(0, 200)] | |
results = run_all_tasks(tasks) | |
print("Finished! %d results" % results) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment