Skip to content

Instantly share code, notes, and snippets.

@NelsonMinar
Created January 20, 2015 01:51
Show Gist options
  • Save NelsonMinar/022794b6a709ea5b7682 to your computer and use it in GitHub Desktop.
Save NelsonMinar/022794b6a709ea5b7682 to your computer and use it in GitHub Desktop.
Python bug with logging and multiprocessing with maxtasksperchild=1
'''Demo of a bug with multiprocessing.Pool, logging, and maxtasksperchild=1.
This program uses Pool.imap_unordered() to execute 200 tasks. Each worker
task writes a log message and sleeps a short time. The master process
uses a timeout on next() to log a status message occasionally.
When it works, 200 jobs are completed quickly. When it breaks, roughly
195 of 200 jobs will have completed and next() never raises StopIteration.
If everything logs to logging.getLogger() and maxtasksperchild=1, it
usually breaks. It appears that sometimes jobs just get lost and don't
complete. We've observed that with maxtasksperchild=1 sometimes a new
worker process gets created but no work assigned to it. When that happens
the task queue never runs to completion.
If we log straight to stderr or don't set maxtasksperchild, the run
completes.
The bug has been observed in Python 2.7.6 and Python 3.4.0 on Ubuntu 14.04
This is a distillation of much more complex application-specific code.
Discussion of the bug and original code can be found at
https://github.com/openaddresses/machine/issues/51
https://github.com/openaddresses/machine/blob/7c3d0fba8ba0915af2101ace45dfaf5519d5ad85/openaddr/jobs.py
Nelson Minar <nelson@monkey.org> January 19, 2015
'''
import logging, multiprocessing, time, os, sys, random
### This pair of settings makes the run break
maxtasksperchild = 1
def log(msg, *args): logging.getLogger().info(msg, *args)
### Using either of these, it will work
# maxtasksperchild = None
# def log(msg, *args): sys.stderr.write((msg + '\n') % args)
def run_all_tasks(tasks):
count = 0
pool = multiprocessing.Pool(processes = 8,
maxtasksperchild = maxtasksperchild)
result_iter = pool.imap_unordered(fake_work, tasks, chunksize=1)
while True:
try:
result_iter.next(timeout=1)
count += 1
except multiprocessing.TimeoutError:
pass # finally clause prints status
except KeyboardInterrupt:
raise
except StopIteration:
log("All jobs complete!")
pool.close()
return count
finally:
log("Job completion: %d/%d", count, len(tasks))
raise Exception("Job queue exited in an odd manner.")
def fake_work(duration):
log("Sleeping for %.2f" % duration)
time.sleep(duration)
def main():
logging.basicConfig(level=logging.DEBUG)
tasks = [random.uniform(0.01, 0.1) for n in range(0, 200)]
results = run_all_tasks(tasks)
print("Finished! %d results" % results)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment