Skip to content

Instantly share code, notes, and snippets.

@gforsyth
Created February 19, 2021 21:55
Show Gist options
  • Save gforsyth/f3425d6bd37e6f0b68ba46dbf8121efd to your computer and use it in GitHub Desktop.
Save gforsyth/f3425d6bd37e6f0b68ba46dbf8121efd to your computer and use it in GitHub Desktop.
dask dataframe column selection stacktrace
distributed.worker - WARNING - Compute Failed
Function: subgraph_callable
args: ( id name x y
timestamp
2000-01-01 00:00:00 996 George 0.828987 -0.929548
2000-01-01 00:00:01 1000 Jerry -0.110702 0.714628
2000-01-01 00:00:02 1018 Wendy 0.626610 -0.723174
2000-01-01 00:00:03 998 Norbert -0.249345 0.829807
2000-01-01 00:00:04 983 Patricia -0.614237 0.045530
... ... ... ... ...
2000-01-01 23:59:55 968 George 0.607650 -0.601929
2000-01-01 23:59:56 1015 Ursula -0.131636 -0.947113
2000-01-01 23:59:57 997 Quinn 0.864542 0.200530
2000-01-01 23:59:58 990 Patricia 0.675835 -0.233165
2000-01-01 23:59:59 952 Charlie 0.656843 0.612066
[86400 rows x 4 columns], "('x',)")
kwargs: {}
Exception: KeyError("('x',)")
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-6-4e0da72f8a21> in <module>
----> 1 ddf.head()
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
1040 Whether to compute the result, default is True.
1041 """
-> 1042 return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
1043
1044 def _head(self, n, npartitions, compute, safe):
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
1073
1074 if compute:
-> 1075 result = result.compute()
1076 return result
1077
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
279 dask.base.compute
280 """
--> 281 (result,) = compute(self, traverse=False, **kwargs)
282 return result
283
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
561 postcomputes.append(x.__dask_postcompute__())
562
--> 563 results = schedule(dsk, keys, **kwargs)
564 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
565
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2653 should_rejoin = False
2654 try:
-> 2655 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2656 finally:
2657 for f in futures.values():
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1962 else:
1963 local_worker = None
-> 1964 return self.sync(
1965 self._gather,
1966 futures,
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
836 return future
837 else:
--> 838 return sync(
839 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
840 )
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
338 if error[0]:
339 typ, exc, tb = error[0]
--> 340 raise exc.with_traceback(tb)
341 else:
342 return result[0]
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/utils.py in f()
322 if callback_timeout is not None:
323 future = asyncio.wait_for(future, callback_timeout)
--> 324 result[0] = yield future
325 except Exception as exc:
326 error[0] = sys.exc_info()
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1827 exc = CancelledError(key)
1828 else:
-> 1829 raise exception.with_traceback(traceback)
1830 raise exc
1831 if errors == "skip":
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/optimization.py in __call__()
961 if not len(args) == len(self.inkeys):
962 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
964
965 def __reduce__(self):
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/core.py in get()
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/dask/core.py in _execute_task()
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/pandas/core/frame.py in __getitem__()
2798 if self.columns.nlevels > 1:
2799 return self._getitem_multilevel(key)
-> 2800 indexer = self.columns.get_loc(key)
2801 if is_integer(indexer):
2802 indexer = [indexer]
~/miniforge3/envs/dd_col_test/lib/python3.8/site-packages/pandas/core/indexes/base.py in get_loc()
2646 return self._engine.get_loc(key)
2647 except KeyError:
-> 2648 return self._engine.get_loc(self._maybe_cast_indexer(key))
2649 indexer = self.get_indexer([key], method=method, tolerance=tolerance)
2650 if indexer.ndim > 1 or indexer.size > 1:
pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: "('x',)"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment