Last active
October 6, 2019 14:06
-
-
Save rafa-guedes/ee28f746f9df86095e72cf55d5cd70d7 to your computer and use it in GitHub Desktop.
Testing script to check if missing zarr chunks from gcs could be related to dask distributed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Check if missing chunks transferred from gcp-zarr are related to dask.""" | |
import os | |
import numpy as np | |
import xarray as xr | |
from fsspec import get_mapper | |
from dask.distributed import Client | |
OUTDIR = "/tmp" | |
TIME = "2012-01-01 00:00:00" | |
ZARRFILE = "gs://oceanum-era5/wind_10m.zarr" | |
NTRIES = 20 | |
NCPU = 1 | |
class TestXarrayDistributed: | |
@classmethod | |
def setup_class(self): | |
self.client = None | |
self.iter = 0 | |
self.success = [] | |
self.failure = [] | |
@classmethod | |
def teardown_class(self): | |
fname = "failure_{}-cpu_{}-tries_{}".format(NCPU, NTRIES, TIME) | |
if self.failure: | |
dset = xr.concat(self.failure, dim="iter") | |
dset.to_netcdf(os.path.join(OUTDIR, fname+".nc")) | |
else: | |
with open(os.path.join(OUTDIR, fname+".txt"), "w") as stream: | |
stream.write("NO FAILURES") | |
fname = "success_{}-cpu_{}-tries_{}".format(NCPU, NTRIES, TIME) | |
if self.success: | |
dset = xr.concat(self.success, dim="iter") | |
dset.to_netcdf(os.path.join(OUTDIR, fname+".nc")) | |
else: | |
with open(os.path.join(OUTDIR, fname+".txt"), "w") as stream: | |
stream.write("NO SUCCESSES") | |
def start_cluster(self): | |
if NCPU > 1: | |
print("Starting distributed with {} cpus".format(NCPU)) | |
self.client = Client() | |
self.client.cluster.scale(NCPU) | |
def close_cluster(self): | |
if self.client is not None: | |
self.client.close() | |
def open_dataset(self): | |
print("Opening zarr dataset: {}".format(ZARRFILE)) | |
fsmap = get_mapper(ZARRFILE) | |
self.dset = xr.open_zarr(fsmap, consolidated=True) | |
def close_dataset(self): | |
if self.dset is not None: | |
print("Closing zarr dataset") | |
self.dset.close() | |
def load_timestamp(self): | |
ds = self.dset.sel(time=TIME).load() | |
try: | |
dvar = np.sqrt(ds.u10**2 + ds.v10**2) | |
except: | |
dvar = ds.hs | |
return dvar | |
def append_result(self): | |
dvar = self.load_timestamp() | |
if dvar.isnull().any(): | |
self.failure.append(dvar) | |
else: | |
self.success.append(dvar) | |
self.iter += 1 | |
def test_transfer(self): | |
self.start_cluster() | |
self.open_dataset() | |
for i in range(NTRIES): | |
print("\n{}\nIterating: {}\n{}".format(88*"=", i, 88*"=")) | |
self.append_result() | |
self.close_dataset() | |
self.close_cluster() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment