Skip to content

Instantly share code, notes, and snippets.

@cisaacstern
Created December 21, 2021 15:54
Show Gist options
  • Save cisaacstern/0399d552161d5fe91a74508e475e649a to your computer and use it in GitHub Desktop.
Save cisaacstern/0399d552161d5fe91a74508e475e649a to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import json
import time
import signal
import sys
import fsspec
import gcsfs
import h5py
import numpy as np
import xarray as xr
from tqdm import tqdm
gcs = gcsfs.GCSFileSystem(requester_pays=True)
cache_base = "pangeo-forge-us-central1/pangeo-forge-cache"
all_paths = gcs.ls(f"{cache_base}/soda342/5day_ice")
all_paths = [f"gs://{p}" for p in all_paths]
class Timeout(Exception):
pass
def handler(signum, frame):
raise Timeout
def reset_timeout(timeout):
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
def read_from_cache(path, reader, storage):
kwargs = dict(requester_pays=True) if storage == "gcs" else {}
with fsspec.open(path, mode="rb", **kwargs) as ofile:
if reader == "h5py":
with h5py.File(ofile) as h5file:
for var_name in h5file.keys():
arr = np.asarray(h5file[var_name])
del arr
elif reader == "xarray":
with xr.open_dataset(ofile) as ds:
for _, var_coded in ds.variables.items():
var = xr.backends.zarr.encode_zarr_variable(var_coded)
arr = np.asarray(var.data)
del arr
def reader_loop(paths, reader, timeout=30, traceback=False, storage="gcs"):
for n in tqdm(range(len(paths))):
start = time.time()
reset_timeout(timeout)
try:
read_from_cache(paths[n], reader=reader, storage=storage)
except Timeout as e:
print(
f"\n{time.time()-start} seconds have elapsed during iteration {n} "
f"with reader '{reader}', exceeding specified `timeout` of {timeout} seconds.\n"
f"The following path is hanging:\n{paths[n]}"
)
if traceback:
tasks = fsspec.asyn._dump_running_tasks(printout=False, cancel=False)
print(json.dumps(tasks, indent=4))
last_lines_of_trace = []
for k in tasks.keys():
last_line = (
tasks[k]["traceback.format_stack"][-1]
if "traceback.format_stack" in tasks[k].keys()
else "No traceback available for this task."
)
last_lines_of_trace.append(last_line)
return last_lines_of_trace
else:
return paths[n]
print("All paths opened without hanging.")
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment