Created
March 28, 2012 01:04
-
-
Save notmyname/2222604 to your computer and use it in GitHub Desktop.
multiprocessing queues hang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import multiprocessing | |
import sys | |
import Queue | |
import time | |
import traceback | |
class FooErr(Exception): | |
def __init__(self, a, b, c): | |
pass | |
def foo_error(in_queue, out_queue): | |
try: | |
while True: | |
item = in_queue.get() | |
if item is None: | |
# no more work to process | |
break | |
# put something large in the queue | |
out_queue.put('x' * 1000000) | |
# exception | |
raise FooErr(1, 2, 3) | |
except Exception, err: | |
out_queue.put(err) | |
finally: | |
in_queue.close() | |
out_queue.close() | |
def caller_error(): | |
worker_count = 4 | |
r = [] | |
in_queue = multiprocessing.Queue() | |
out_queue = multiprocessing.Queue() | |
for _ in xrange(worker_count): | |
p = multiprocessing.Process(target=foo_error, | |
args=(in_queue, out_queue)) | |
p.start() | |
r.append(p) | |
work = 10 | |
for i in xrange(work): | |
in_queue.put(i) | |
for i in range(worker_count): | |
in_queue.put(None) | |
while True: | |
try: | |
data = out_queue.get_nowait() | |
except Queue.Empty: | |
time.sleep(.2) | |
else: | |
if isinstance(data, Exception): | |
print 'got exception', data | |
else: | |
yield data | |
if not any(p.is_alive() for p in r) and out_queue.empty(): | |
# all the workers are done and nothing is in the queue | |
break | |
############################################################################### | |
class WorkerError(Exception): | |
pass | |
def foo_no_error(in_queue, out_queue): | |
try: | |
while True: | |
item = in_queue.get() | |
if item is None: | |
# no more work to process | |
break | |
# put something large in the queue | |
out_queue.put('x' * 1000000) | |
# exception | |
raise FooErr(1, 2, 3) | |
except Exception, err: | |
t, v, tb = sys.exc_info() | |
e = WorkerError() | |
e.tb = traceback.format_tb(tb) | |
out_queue.put(e) | |
finally: | |
in_queue.close() | |
out_queue.close() | |
def caller_no_error(): | |
worker_count = 4 | |
r = [] | |
in_queue = multiprocessing.Queue() | |
out_queue = multiprocessing.Queue() | |
for _ in xrange(worker_count): | |
p = multiprocessing.Process(target=foo_no_error, | |
args=(in_queue, out_queue)) | |
p.start() | |
r.append(p) | |
work = 10 | |
for i in xrange(work): | |
in_queue.put(i) | |
for i in range(worker_count): | |
in_queue.put(None) | |
while True: | |
try: | |
data = out_queue.get_nowait() | |
except Queue.Empty: | |
time.sleep(.2) | |
else: | |
if isinstance(data, WorkerError): | |
print 'got exception', ''.join(data.tb) | |
else: | |
yield data | |
if not any(p.is_alive() for p in r) and out_queue.empty(): | |
# all the workers are done and nothing is in the queue | |
break | |
print 'no error test (the problem is fixed)' | |
for x in caller_no_error(): | |
print len(x) | |
# run the problem version last since it will cause the process to hang | |
print 'error test (the problem is not fixed)' | |
for x in caller_error(): | |
print len(x) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The problem is cause by the exception raised in the child process that has a constructor with required parameters. When the exception instance is unpickled (by the multiprocessing.Queue reader), the unpickle fails and causes the queue thread to hang. Because data is being written the the Queue, the buffer fills up and the child processes can't exit (because the data must be flushed before the thread can join).
The solution with this pattern is to wrap the exception from the child and pass that object to the parent reader. This still allows for the parent to report and handle errors in the child processes, but prevents the hang caused by the unpickling exception.