Created
December 21, 2021 15:54
-
-
Save cisaacstern/0399d552161d5fe91a74508e475e649a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import time | |
import signal | |
import sys | |
import fsspec | |
import gcsfs | |
import h5py | |
import numpy as np | |
import xarray as xr | |
from tqdm import tqdm | |
gcs = gcsfs.GCSFileSystem(requester_pays=True) | |
cache_base = "pangeo-forge-us-central1/pangeo-forge-cache" | |
all_paths = gcs.ls(f"{cache_base}/soda342/5day_ice") | |
all_paths = [f"gs://{p}" for p in all_paths] | |
class Timeout(Exception): | |
pass | |
def handler(signum, frame): | |
raise Timeout | |
def reset_timeout(timeout): | |
signal.signal(signal.SIGALRM, handler) | |
signal.alarm(timeout) | |
def read_from_cache(path, reader, storage): | |
kwargs = dict(requester_pays=True) if storage == "gcs" else {} | |
with fsspec.open(path, mode="rb", **kwargs) as ofile: | |
if reader == "h5py": | |
with h5py.File(ofile) as h5file: | |
for var_name in h5file.keys(): | |
arr = np.asarray(h5file[var_name]) | |
del arr | |
elif reader == "xarray": | |
with xr.open_dataset(ofile) as ds: | |
for _, var_coded in ds.variables.items(): | |
var = xr.backends.zarr.encode_zarr_variable(var_coded) | |
arr = np.asarray(var.data) | |
del arr | |
def reader_loop(paths, reader, timeout=30, traceback=False, storage="gcs"): | |
for n in tqdm(range(len(paths))): | |
start = time.time() | |
reset_timeout(timeout) | |
try: | |
read_from_cache(paths[n], reader=reader, storage=storage) | |
except Timeout as e: | |
print( | |
f"\n{time.time()-start} seconds have elapsed during iteration {n} " | |
f"with reader '{reader}', exceeding specified `timeout` of {timeout} seconds.\n" | |
f"The following path is hanging:\n{paths[n]}" | |
) | |
if traceback: | |
tasks = fsspec.asyn._dump_running_tasks(printout=False, cancel=False) | |
print(json.dumps(tasks, indent=4)) | |
last_lines_of_trace = [] | |
for k in tasks.keys(): | |
last_line = ( | |
tasks[k]["traceback.format_stack"][-1] | |
if "traceback.format_stack" in tasks[k].keys() | |
else "No traceback available for this task." | |
) | |
last_lines_of_trace.append(last_line) | |
return last_lines_of_trace | |
else: | |
return paths[n] | |
print("All paths opened without hanging.") | |
return None | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment