Skip to content

Instantly share code, notes, and snippets.

@jhamman
Created January 6, 2018 19:04
Show Gist options
  • Save jhamman/25ddda993ad5b768e4b8289904be6779 to your computer and use it in GitHub Desktop.
Save jhamman/25ddda993ad5b768e4b8289904be6779 to your computer and use it in GitHub Desktop.
xarray/zarr debugging
---------------------------------------------------------------------------
ConnectionError Traceback (most recent call last)
<timed eval> in <module>()
/pool0/data/jhamman1/projects/pangeo/xarray/xarray/core/dataset.py in to_zarr(self, store, mode, synchronizer, group, encoding)
1165 from ..backends.api import to_zarr
1166 return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
-> 1167 group=group, encoding=encoding)
1168
1169 def __unicode__(self):
/pool0/data/jhamman1/projects/pangeo/xarray/xarray/backends/api.py in to_zarr(dataset, store, mode, synchronizer, group, encoding)
738 # I think zarr stores should always be sync'd immediately
739 # TODO: figure out how to properly handle unlimited_dims
--> 740 dataset.dump_to_store(store, sync=True, encoding=encoding)
741 return store
/pool0/data/jhamman1/projects/pangeo/xarray/xarray/core/dataset.py in dump_to_store(self, store, encoder, sync, encoding, unlimited_dims)
1070 unlimited_dims=unlimited_dims)
1071 if sync:
-> 1072 store.sync()
1073
1074 def to_netcdf(self, path=None, mode='w', format=None, group=None,
/pool0/data/jhamman1/projects/pangeo/xarray/xarray/backends/common.py in sync(self)
222
223 def sync(self):
--> 224 self.writer.sync()
225
226 def store_dataset(self, dataset):
/pool0/data/jhamman1/projects/pangeo/xarray/xarray/backends/common.py in sync(self)
188 if self.sources:
189 import dask.array as da
--> 190 da.store(self.sources, self.targets, lock=self.lock)
191 self.sources = []
192 self.targets = []
~/anaconda/envs/pangeo/lib/python3.6/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, **kwargs)
898 dsk = sharedict.merge((name, updates), *[src.dask for src in sources])
899 if compute:
--> 900 compute_as_if_collection(Array, dsk, keys, **kwargs)
901 else:
902 from ..delayed import Delayed
~/anaconda/envs/pangeo/lib/python3.6/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, get, **kwargs)
210 get = get or _globals['get'] or cls.__dask_scheduler__
211 dsk2 = optimization_function(cls)(ensure_dict(dsk), keys, **kwargs)
--> 212 return get(dsk2, keys, **kwargs)
213
214
~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, **kwargs)
1997 secede()
1998 try:
-> 1999 results = self.gather(packed, asynchronous=asynchronous)
2000 finally:
2001 for f in futures.values():
~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
1435 return self.sync(self._gather, futures, errors=errors,
1436 direct=direct, local_worker=local_worker,
-> 1437 asynchronous=asynchronous)
1438
1439 @gen.coroutine
~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
590 return future
591 else:
--> 592 return sync(self.loop, func, *args, **kwargs)
593
594 def __repr__(self):
~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
252 e.wait(1000000)
253 if error[0]:
--> 254 six.reraise(*error[0])
255 else:
256 return result[0]
~/anaconda/envs/pangeo/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py in f()
236 yield gen.moment
237 thread_state.asynchronous = True
--> 238 result[0] = yield make_coro()
239 except Exception as exc:
240 logger.exception(exc)
~/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py in run(self)
1053
1054 try:
-> 1055 value = future.result()
1056 except Exception:
1057 self.had_exception = True
~/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/concurrent.py in result(self, timeout)
236 if self._exc_info is not None:
237 try:
--> 238 raise_exc_info(self._exc_info)
239 finally:
240 self = None
~/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/util.py in raise_exc_info(exc_info)
~/anaconda/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py in run(self)
1061 if exc_info is not None:
1062 try:
-> 1063 yielded = self.gen.throw(*exc_info)
1064 finally:
1065 # Break up a reference to itself
~/anaconda/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1313 six.reraise(type(exception),
1314 exception,
-> 1315 traceback)
1316 if errors == 'skip':
1317 bad_keys.add(key)
~/anaconda/envs/pangeo/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
690 value = tp()
691 if value.__traceback__ is not tb:
--> 692 raise value.with_traceback(tb)
693 raise value
694 finally:
~/anaconda/envs/pangeo/lib/python3.6/site-packages/dask/array/core.py in store()
2393 try:
2394 if region is None:
-> 2395 out[index] = np.asanyarray(x)
2396 else:
2397 out[fuse_slice(region, index)] = np.asanyarray(x)
/pool0/data/jhamman1/projects/pangeo/zarr/zarr/core.py in __setitem__()
1100
1101 fields, selection = pop_fields(selection)
-> 1102 self.set_basic_selection(selection, value, fields=fields)
1103
1104 def set_basic_selection(self, selection, value, fields=None):
/pool0/data/jhamman1/projects/pangeo/zarr/zarr/core.py in set_basic_selection()
1195 return self._set_basic_selection_zd(selection, value, fields=fields)
1196 else:
-> 1197 return self._set_basic_selection_nd(selection, value, fields=fields)
1198
1199 def set_orthogonal_selection(self, selection, value, fields=None):
/pool0/data/jhamman1/projects/pangeo/zarr/zarr/core.py in _set_basic_selection_nd()
1486 indexer = BasicIndexer(selection, self)
1487
-> 1488 self._set_selection(indexer, value, fields=fields)
1489
1490 def _set_selection(self, indexer, value, fields=None):
/pool0/data/jhamman1/projects/pangeo/zarr/zarr/core.py in _set_selection()
1534
1535 # put data
-> 1536 self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
1537
1538 def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
/pool0/data/jhamman1/projects/pangeo/zarr/zarr/core.py in _chunk_setitem()
1642 with lock:
1643 self._chunk_setitem_nosync(chunk_coords, chunk_selection, value,
-> 1644 fields=fields)
1645
1646 def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=None):
/pool0/data/jhamman1/projects/pangeo/zarr/zarr/core.py in _chunk_setitem_nosync()
1719
1720 # store
-> 1721 self.chunk_store[ckey] = cdata
1722
1723 def _chunk_key(self, chunk_coords):
/pool0/data/jhamman1/projects/pangeo/gcsfs/gcsfs/mapping.py in __setitem__()
74 key = self._key_to_str(key)
75 with self.gcs.open(key, 'wb') as f:
---> 76 f.write(value)
77
78 def keys(self):
/pool0/data/jhamman1/projects/pangeo/gcsfs/gcsfs/core.py in write()
871 self.loc += out
872 if self.buffer.tell() >= self.blocksize:
--> 873 self.flush()
874 return out
875
/pool0/data/jhamman1/projects/pangeo/gcsfs/gcsfs/core.py in flush()
901 self._initiate_upload()
902 if self.location is not None:
--> 903 self._upload_chunk(final=force)
904 if force:
905 self.forced = True
/pool0/data/jhamman1/projects/pangeo/gcsfs/gcsfs/core.py in _upload_chunk()
925 r = self.gcsfs.session.post(
926 self.location, params={'uploadType': 'resumable'},
--> 927 headers=head, data=data)
928 validate_response(r, self.location)
929 if 'Range' in r.headers:
~/anaconda/envs/pangeo/lib/python3.6/site-packages/requests/sessions.py in post()
553 """
554
--> 555 return self.request('POST', url, data=data, json=json, **kwargs)
556
557 def put(self, url, data=None, **kwargs):
~/anaconda/envs/pangeo/lib/python3.6/site-packages/requests/sessions.py in request()
506 }
507 send_kwargs.update(settings)
--> 508 resp = self.send(prep, **send_kwargs)
509
510 return resp
~/anaconda/envs/pangeo/lib/python3.6/site-packages/requests/sessions.py in send()
616
617 # Send the request
--> 618 r = adapter.send(request, **kwargs)
619
620 # Total elapsed time of the request (approximately)
~/anaconda/envs/pangeo/lib/python3.6/site-packages/requests/adapters.py in send()
488
489 except (ProtocolError, socket.error) as err:
--> 490 raise ConnectionError(err, request=request)
491
492 except MaxRetryError as e:
ConnectionError: ('Connection aborted.', OSError("(32, 'EPIPE')",))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment