Skip to content

Instantly share code, notes, and snippets.

@pp-mo
Created March 10, 2023 15:51
Show Gist options
  • Save pp-mo/a5b1d693c4a27192e5e0e2c2c4fa5ca9 to your computer and use it in GitHub Desktop.
Save pp-mo/a5b1d693c4a27192e5e0e2c2c4fa5ca9 to your computer and use it in GitHub Desktop.
Testing for various Iris netcdf saving options, including lazy-saving and distributed scheduler
import iris
from iris.tests.stock import realistic_4d
from iris.fileformats.netcdf._dask_locks import get_dask_array_scheduler_type
import dask.array as da
import dask.config
try:
import distributed
except ImportError:
distributed = None
import numpy as np
# Just avoid a warning
iris.FUTURE.datum_support = True
CLIENT = None
def set_scheduler(scheduler_type: str):
global CLIENT
assert scheduler_type in ("distributed", "threads")
if scheduler_type == "distributed":
if CLIENT is None:
CLIENT = distributed.Client()
else:
if CLIENT is not None:
CLIENT.close()
CLIENT = None
dask.config.set(scheduler=scheduler_type)
# ENABLE_PAUSE = True
ENABLE_PAUSE = False
def pause():
global ENABLE_PAUSE
if ENABLE_PAUSE:
print("Enter to continue..")
input()
print(".. OK.\n")
def make_lazy_datacube():
cube = realistic_4d()
# Replace data + aux-coords with lazy content.
def lazy_like(array):
if array is not None:
dmin, dmax = array.min(), array.max()
chunks = list(array.shape)
chunks[0] = 1
array = da.random.uniform(
dmin, dmax, size=array.shape, chunks=chunks
)
return array
cube.data = lazy_like(cube.data)
auxcoord = cube.coord("surface_altitude")
auxcoord.points = lazy_like(auxcoord.points)
return cube
def unlazify_cube(cube):
cube.data
for coord in cube.coords():
coord.points
coord.bounds
def surface_sample(cube):
return (
cube.coord("surface_altitude").lazy_points().flatten()[:10].compute()
)
def check(use_distributed=True, make_data_unlazy=False, do_lazy_save=True):
print("")
print("============================")
msg = (
"CHECK : "
f"distributed={use_distributed} "
f"allrealdata={make_data_unlazy} "
f"lazy_save={do_lazy_save}."
)
print(msg)
scheduler_type = "distributed" if use_distributed else "threads"
set_scheduler(scheduler_type)
detected_schedtype = get_dask_array_scheduler_type()
print(f"Check: scheduler = {detected_schedtype!r}.")
assert detected_schedtype == scheduler_type
cube = make_lazy_datacube()
if make_data_unlazy:
unlazify_cube(cube)
# print(cube)
print("Cube details ...")
print("Core data:", cube.core_data().flatten()[:5])
print(
"Alt coord points:",
cube.coord("surface_altitude").core_points().flatten()[:5],
)
samp = surface_sample(cube)
print("Alt coord points actual sample values", samp)
print("")
print(f"Save (lazy={do_lazy_save})..")
pause()
kwargs = {}
if do_lazy_save:
kwargs["compute"] = False
lazy_save = iris.save(cube, "tmp.nc", **kwargs)
print(".. done.")
print("save result = ", lazy_save)
print("")
print("Readback altitude:")
pause()
readback = iris.load_cube("tmp.nc", "air_potential_temperature")
alti_sample = surface_sample(readback)
print(" altitude = ", alti_sample)
expect_unset = do_lazy_save and not make_data_unlazy
assert np.all(alti_sample.mask) == expect_unset
if do_lazy_save:
print("")
print("Complete the lazy save..")
pause()
lazy_save.compute()
print("Readback altitude again :")
pause()
# DON'T need to re-load cube
# readback = iris.load_cube("tmp.nc", "air_potential_temperature")
alti_sample = surface_sample(readback)
print(" altitude = ", alti_sample)
assert np.all(alti_sample.mask == False)
if __name__ == "__main__":
# schdeuler is not dynamically switchable, at present
# USE_DISTRIBUTED_OPTIONS = (True,)
# USE_DISTRIBUTED_OPTIONS = (False,)
USE_DISTRIBUTED_OPTIONS = (True, False)
DO_LAZYS = (False, True)
# DO_LAZYS = (False,)
# DO_LAZYS = (True,)
# DO_LAZYS = (True, False)
test_cases = []
for DO_LAZY in DO_LAZYS:
for USE_DISTRIBUTED in USE_DISTRIBUTED_OPTIONS:
for do_allrealdata in (False, True):
test_cases.append((USE_DISTRIBUTED, do_allrealdata, DO_LAZY))
# # override
# test_cases = [(True, False, True)]
for USE_DISTRIBUTED, do_allrealdata, DO_LAZY in test_cases:
check(
use_distributed=USE_DISTRIBUTED,
make_data_unlazy=do_allrealdata,
do_lazy_save=DO_LAZY,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment