Skip to content

Instantly share code, notes, and snippets.

@notmyname
Created March 28, 2012 01:04
Show Gist options
  • Save notmyname/2222604 to your computer and use it in GitHub Desktop.
Save notmyname/2222604 to your computer and use it in GitHub Desktop.
multiprocessing queues hang
#!/usr/bin/env python
import multiprocessing
import sys
import Queue
import time
import traceback
class FooErr(Exception):
def __init__(self, a, b, c):
pass
def foo_error(in_queue, out_queue):
try:
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
# put something large in the queue
out_queue.put('x' * 1000000)
# exception
raise FooErr(1, 2, 3)
except Exception, err:
out_queue.put(err)
finally:
in_queue.close()
out_queue.close()
def caller_error():
worker_count = 4
r = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
for _ in xrange(worker_count):
p = multiprocessing.Process(target=foo_error,
args=(in_queue, out_queue))
p.start()
r.append(p)
work = 10
for i in xrange(work):
in_queue.put(i)
for i in range(worker_count):
in_queue.put(None)
while True:
try:
data = out_queue.get_nowait()
except Queue.Empty:
time.sleep(.2)
else:
if isinstance(data, Exception):
print 'got exception', data
else:
yield data
if not any(p.is_alive() for p in r) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
###############################################################################
class WorkerError(Exception):
pass
def foo_no_error(in_queue, out_queue):
try:
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
# put something large in the queue
out_queue.put('x' * 1000000)
# exception
raise FooErr(1, 2, 3)
except Exception, err:
t, v, tb = sys.exc_info()
e = WorkerError()
e.tb = traceback.format_tb(tb)
out_queue.put(e)
finally:
in_queue.close()
out_queue.close()
def caller_no_error():
worker_count = 4
r = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
for _ in xrange(worker_count):
p = multiprocessing.Process(target=foo_no_error,
args=(in_queue, out_queue))
p.start()
r.append(p)
work = 10
for i in xrange(work):
in_queue.put(i)
for i in range(worker_count):
in_queue.put(None)
while True:
try:
data = out_queue.get_nowait()
except Queue.Empty:
time.sleep(.2)
else:
if isinstance(data, WorkerError):
print 'got exception', ''.join(data.tb)
else:
yield data
if not any(p.is_alive() for p in r) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
print 'no error test (the problem is fixed)'
for x in caller_no_error():
print len(x)
# run the problem version last since it will cause the process to hang
print 'error test (the problem is not fixed)'
for x in caller_error():
print len(x)
@notmyname
Copy link
Author

The problem is cause by the exception raised in the child process that has a constructor with required parameters. When the exception instance is unpickled (by the multiprocessing.Queue reader), the unpickle fails and causes the queue thread to hang. Because data is being written the the Queue, the buffer fills up and the child processes can't exit (because the data must be flushed before the thread can join).

The solution with this pattern is to wrap the exception from the child and pass that object to the parent reader. This still allows for the parent to report and handle errors in the child processes, but prevents the hang caused by the unpickling exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment