Skip to content

Instantly share code, notes, and snippets.

@rafa-guedes
Last active October 6, 2019 14:06
Show Gist options
  • Save rafa-guedes/ee28f746f9df86095e72cf55d5cd70d7 to your computer and use it in GitHub Desktop.
Save rafa-guedes/ee28f746f9df86095e72cf55d5cd70d7 to your computer and use it in GitHub Desktop.
Testing script to check if missing zarr chunks from gcs could be related to dask distributed
"""Check if missing chunks transferred from gcp-zarr are related to dask."""
import os
import numpy as np
import xarray as xr
from fsspec import get_mapper
from dask.distributed import Client
OUTDIR = "/tmp"
TIME = "2012-01-01 00:00:00"
ZARRFILE = "gs://oceanum-era5/wind_10m.zarr"
NTRIES = 20
NCPU = 1
class TestXarrayDistributed:
@classmethod
def setup_class(self):
self.client = None
self.iter = 0
self.success = []
self.failure = []
@classmethod
def teardown_class(self):
fname = "failure_{}-cpu_{}-tries_{}".format(NCPU, NTRIES, TIME)
if self.failure:
dset = xr.concat(self.failure, dim="iter")
dset.to_netcdf(os.path.join(OUTDIR, fname+".nc"))
else:
with open(os.path.join(OUTDIR, fname+".txt"), "w") as stream:
stream.write("NO FAILURES")
fname = "success_{}-cpu_{}-tries_{}".format(NCPU, NTRIES, TIME)
if self.success:
dset = xr.concat(self.success, dim="iter")
dset.to_netcdf(os.path.join(OUTDIR, fname+".nc"))
else:
with open(os.path.join(OUTDIR, fname+".txt"), "w") as stream:
stream.write("NO SUCCESSES")
def start_cluster(self):
if NCPU > 1:
print("Starting distributed with {} cpus".format(NCPU))
self.client = Client()
self.client.cluster.scale(NCPU)
def close_cluster(self):
if self.client is not None:
self.client.close()
def open_dataset(self):
print("Opening zarr dataset: {}".format(ZARRFILE))
fsmap = get_mapper(ZARRFILE)
self.dset = xr.open_zarr(fsmap, consolidated=True)
def close_dataset(self):
if self.dset is not None:
print("Closing zarr dataset")
self.dset.close()
def load_timestamp(self):
ds = self.dset.sel(time=TIME).load()
try:
dvar = np.sqrt(ds.u10**2 + ds.v10**2)
except:
dvar = ds.hs
return dvar
def append_result(self):
dvar = self.load_timestamp()
if dvar.isnull().any():
self.failure.append(dvar)
else:
self.success.append(dvar)
self.iter += 1
def test_transfer(self):
self.start_cluster()
self.open_dataset()
for i in range(NTRIES):
print("\n{}\nIterating: {}\n{}".format(88*"=", i, 88*"="))
self.append_result()
self.close_dataset()
self.close_cluster()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment