Skip to content

Instantly share code, notes, and snippets.

@stsievert
Last active September 2, 2016 02:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stsievert/43391861410203ebbec6511ef3d0e4e4 to your computer and use it in GitHub Desktop.
Save stsievert/43391861410203ebbec6511ef3d0e4e4 to your computer and use it in GitHub Desktop.
Stack trace for using joblib in dask.distributed.Executor.map
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<ipython-input-6-94dad265e0f3> in <module>()
17 # dask.distributed
18 b = e.map(repeated_tests, values)
---> 19 b = e.gather(b)
20 print(b)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/executor.py in gather(self, futures, errors, maxsize)
826 return (self.gather(f, errors=errors) for f in futures)
827 else:
--> 828 return sync(self.loop, self._gather, futures, errors=errors)
829
830 @gen.coroutine
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
114 e.wait(1000000)
115 if error[0]:
--> 116 six.reraise(type(error[0]), error[0], traceback[0])
117 else:
118 return result[0]
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/utils.py in f()
100 def f():
101 try:
--> 102 result[0] = yield gen.maybe_future(func(*args, **kwargs))
103 except Exception as exc:
104 logger.exception(exc)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1006
1007 try:
-> 1008 value = future.result()
1009 except Exception:
1010 self.had_exception = True
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
230 return self._result
231 if self._exc_info is not None:
--> 232 raise_exc_info(self._exc_info)
233 self._check_done()
234 return self._result
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1012
1013 if exc_info is not None:
-> 1014 yielded = self.gen.throw(*exc_info)
1015 exc_info = None
1016 else:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/executor.py in _gather(self, futures, errors)
745 six.reraise(type(d['exception']),
746 d['exception'],
--> 747 d['traceback'])
748 except KeyError:
749 six.reraise(CancelledError,
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
683 value = tp()
684 if value.__traceback__ is not tb:
--> 685 raise value.with_traceback(tb)
686 raise value
687
<ipython-input-5-5207fbeca17e> in repeated_tests()
5 def repeated_tests(x, n=10):
6 y = Parallel(n_jobs=-1)(delayed(model_performance)(x)
----> 7 for x in range(n))
8 return sum(y) / n
9
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in __call__()
752 # was dispatched. In particular this covers the edge
753 # case of Parallel used with an exhausted iterator.
--> 754 while self.dispatch_one_batch(iterator):
755 self._iterating = True
756 else:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in dispatch_one_batch()
597
598 with self._lock:
--> 599 tasks = BatchedCalls(itertools.islice(iterator, batch_size))
600 if len(tasks) == 0:
601 # No more tasks available in the iterator: tell caller to stop.
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in __init__()
121
122 def __init__(self, iterator_slice):
--> 123 self.items = list(iterator_slice)
124 self._size = len(self.items)
125
<ipython-input-5-5207fbeca17e> in <genexpr>()
5 def repeated_tests(x, n=10):
6 y = Parallel(n_jobs=-1)(delayed(model_performance)(x)
----> 7 for x in range(n))
8 return sum(y) / n
9
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in delayed()
177 # using with multiprocessing:
178 if check_pickle:
--> 179 pickle.dumps(function)
180
181 def delayed_function(*args, **kwargs):
PicklingError: Can't pickle <function model_performance at 0x7f0fac1c8950>: attribute lookup model_performance on __main__ failed
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<ipython-input-5-beac8a572fd3> in <module>()
18 # dask.distributed
19 b = e.map(repeated_tests, model_params)
---> 20 b = e.gather(b)
21 print(b)
22
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/executor.py in gather(self, futures, errors, maxsize)
826 return (self.gather(f, errors=errors) for f in futures)
827 else:
--> 828 return sync(self.loop, self._gather, futures, errors=errors)
829
830 @gen.coroutine
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
114 e.wait(1000000)
115 if error[0]:
--> 116 six.reraise(type(error[0]), error[0], traceback[0])
117 else:
118 return result[0]
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/utils.py in f()
100 def f():
101 try:
--> 102 result[0] = yield gen.maybe_future(func(*args, **kwargs))
103 except Exception as exc:
104 logger.exception(exc)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1006
1007 try:
-> 1008 value = future.result()
1009 except Exception:
1010 self.had_exception = True
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
230 return self._result
231 if self._exc_info is not None:
--> 232 raise_exc_info(self._exc_info)
233 self._check_done()
234 return self._result
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1012
1013 if exc_info is not None:
-> 1014 yielded = self.gen.throw(*exc_info)
1015 exc_info = None
1016 else:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/executor.py in _gather(self, futures, errors)
745 six.reraise(type(d['exception']),
746 d['exception'],
--> 747 d['traceback'])
748 except KeyError:
749 six.reraise(CancelledError,
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
683 value = tp()
684 if value.__traceback__ is not tb:
--> 685 raise value.with_traceback(tb)
686 raise value
687
<ipython-input-5-beac8a572fd3> in repeated_tests()
10 with joblib.parallel_backend('distributed', scheduler_host=host):
11 y = Parallel()(delayed(model_performance)(x)
---> 12 for x in range(n))
13 return sum(y) / n
14
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in __call__()
752 # was dispatched. In particular this covers the edge
753 # case of Parallel used with an exhausted iterator.
--> 754 while self.dispatch_one_batch(iterator):
755 self._iterating = True
756 else:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in dispatch_one_batch()
597
598 with self._lock:
--> 599 tasks = BatchedCalls(itertools.islice(iterator, batch_size))
600 if len(tasks) == 0:
601 # No more tasks available in the iterator: tell caller to stop.
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in __init__()
121
122 def __init__(self, iterator_slice):
--> 123 self.items = list(iterator_slice)
124 self._size = len(self.items)
125
<ipython-input-5-beac8a572fd3> in <genexpr>()
10 with joblib.parallel_backend('distributed', scheduler_host=host):
11 y = Parallel()(delayed(model_performance)(x)
---> 12 for x in range(n))
13 return sum(y) / n
14
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in delayed()
177 # using with multiprocessing:
178 if check_pickle:
--> 179 pickle.dumps(function)
180
181 def delayed_function(*args, **kwargs):
PicklingError: Can't pickle <function model_performance at 0x7feb10082d08>: attribute lookup model_performance on __main__ failed
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/executor.py in _gather(self, futures, errors)
746 d['exception'],
--> 747 d['traceback'])
748 except KeyError:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
--> 685 raise value.with_traceback(tb)
686 raise value
<ipython-input-4-4c0de11168c5> in repeated_tests()
7 """ calls model_performance many times to get average performance """
----> 8 with joblib.parallel_backend('distributed', scheduler_host=host):
9 y = Parallel(n_jobs=-1)(delayed(model_performance)(x)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/contextlib.py in __enter__()
58 try:
---> 59 return next(self.gen)
60 except StopIteration:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/joblib/parallel.py in parallel_backend()
92 if isinstance(backend, _basestring):
---> 93 backend = BACKENDS[backend](**backend_params)
94 old_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
KeyError: 'distributed'
During handling of the above exception, another exception occurred:
CancelledError Traceback (most recent call last)
<ipython-input-4-4c0de11168c5> in <module>()
20 # dask.distributed
21 b = e.map(repeated_tests, model_params)
---> 22 b = e.gather(b)
23 print(b)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/executor.py in gather(self, futures, errors, maxsize)
826 return (self.gather(f, errors=errors) for f in futures)
827 else:
--> 828 return sync(self.loop, self._gather, futures, errors=errors)
829
830 @gen.coroutine
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
114 e.wait(1000000)
115 if error[0]:
--> 116 six.reraise(type(error[0]), error[0], traceback[0])
117 else:
118 return result[0]
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/utils.py in f()
100 def f():
101 try:
--> 102 result[0] = yield gen.maybe_future(func(*args, **kwargs))
103 except Exception as exc:
104 logger.exception(exc)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1006
1007 try:
-> 1008 value = future.result()
1009 except Exception:
1010 self.had_exception = True
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
230 return self._result
231 if self._exc_info is not None:
--> 232 raise_exc_info(self._exc_info)
233 self._check_done()
234 return self._result
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/tornado/gen.py in run(self)
1012
1013 if exc_info is not None:
-> 1014 yielded = self.gen.throw(*exc_info)
1015 exc_info = None
1016 else:
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/distributed/executor.py in _gather(self, futures, errors)
749 six.reraise(CancelledError,
750 CancelledError(key),
--> 751 None)
752 if errors == 'skip':
753 bad_keys.add(key)
/mnt/ws/home/ssievert/anaconda3/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
CancelledError: repeated_tests-dda91422df0262125b35e29f87030f69
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<ipython-input-18-39ac719d03e7> in <module>()
11 model_params = [1, 2, 3, 4]
12 performance = e.map(repeated_tests, model_params)
---> 13 performance = e.gather(performance)
14 performance = list(performance)
15 print(performance)
/Users/scott/anaconda/lib/python3.5/site-packages/distributed/executor.py in gather(self, futures, errors, maxsize)
826 return (self.gather(f, errors=errors) for f in futures)
827 else:
--> 828 return sync(self.loop, self._gather, futures, errors=errors)
829
830 @gen.coroutine
/Users/scott/anaconda/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
114 e.wait(1000000)
115 if error[0]:
--> 116 six.reraise(type(error[0]), error[0], traceback[0])
117 else:
118 return result[0]
/Users/scott/anaconda/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
684 if value.__traceback__ is not tb:
685 raise value.with_traceback(tb)
--> 686 raise value
687
688 else:
/Users/scott/anaconda/lib/python3.5/site-packages/distributed/utils.py in f()
100 def f():
101 try:
--> 102 result[0] = yield gen.maybe_future(func(*args, **kwargs))
103 except Exception as exc:
104 logger.exception(exc)
/Users/scott/anaconda/lib/python3.5/site-packages/tornado/gen.py in run(self)
1013
1014 try:
-> 1015 value = future.result()
1016 except Exception:
1017 self.had_exception = True
/Users/scott/anaconda/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
235 return self._result
236 if self._exc_info is not None:
--> 237 raise_exc_info(self._exc_info)
238 self._check_done()
239 return self._result
/Users/scott/anaconda/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)
/Users/scott/anaconda/lib/python3.5/site-packages/tornado/gen.py in run(self)
1019
1020 if exc_info is not None:
-> 1021 yielded = self.gen.throw(*exc_info)
1022 exc_info = None
1023 else:
/Users/scott/anaconda/lib/python3.5/site-packages/distributed/executor.py in _gather(self, futures, errors)
745 six.reraise(type(d['exception']),
746 d['exception'],
--> 747 d['traceback'])
748 except KeyError:
749 six.reraise(CancelledError,
/Users/scott/anaconda/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
683 value = tp()
684 if value.__traceback__ is not tb:
--> 685 raise value.with_traceback(tb)
686 raise value
687
<ipython-input-18-39ac719d03e7> in repeated_tests()
4 def repeated_tests(param, n=10):
5 """ calls model_performance many times to get average performance """
----> 6 y = joblib.Parallel()(joblib.delayed(model_performance)(param) for _ in range(n))
7 print(y)
8 return sum(y) / n
/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py in __call__()
756 # was dispatched. In particular this covers the edge
757 # case of Parallel used with an exhausted iterator.
--> 758 while self.dispatch_one_batch(iterator):
759 self._iterating = True
760 else:
/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py in dispatch_one_batch()
601
602 with self._lock:
--> 603 tasks = BatchedCalls(itertools.islice(iterator, batch_size))
604 if len(tasks) == 0:
605 # No more tasks available in the iterator: tell caller to stop.
/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py in __init__()
125
126 def __init__(self, iterator_slice):
--> 127 self.items = list(iterator_slice)
128 self._size = len(self.items)
129
<ipython-input-18-39ac719d03e7> in <genexpr>()
4 def repeated_tests(param, n=10):
5 """ calls model_performance many times to get average performance """
----> 6 y = joblib.Parallel()(joblib.delayed(model_performance)(param) for _ in range(n))
7 print(y)
8 return sum(y) / n
/Users/scott/anaconda/lib/python3.5/site-packages/joblib/parallel.py in delayed()
181 # using with multiprocessing:
182 if check_pickle:
--> 183 pickle.dumps(function)
184
185 def delayed_function(*args, **kwargs):
PicklingError: Can't pickle <function model_performance at 0x116a0aa60>: it's not the same object as __main__.model_performance
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment