Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/env python
# coding: utf-8
# Extract specific NWM time series from NWM reanalysis
# data on AWS
import xarray as xr
import fsspec
import dask
from dask.distributed import Client
url = 's3://noaa-nwm-retro-v2-zarr-pds'
fs = fsspec.filesystem('s3', anon=True)
ds = xr.open_zarr(fs.get_mapper(url), consolidated=True)
# specify a subset region
idx = (ds.latitude > 41.0) & (ds.latitude < 51.0) & (ds.longitude > -75.0) & (ds.longitude < -62.0)
# specify a time period
ds_out = ds[['streamflow']].isel(feature_id=idx).sel(time=slice('2000-01-01',None))
# generate the encoding dictionary of chunks,
def gchunks(ds_chunk, chunks):
'''
Returns an encoding dictionary for all variables.
Parameters:
ds_chunk (An Xarray dataset)
chunks (dict): a dict specifying the chunking
for each dimension
(e.g. {'time:10', 'lat':20, ...})
Returns:
group_chunks (dict):
'''
group_chunks = {}
for var in ds_chunk.variables:
# pick appropriate chunks from above, and
# to full length chunks for dimensions that are not in `chunks` above.
group_chunks[var] = []
for di in ds_chunk[var].dims:
if di in chunks.keys():
if chunks[di] > len(ds_chunk[di]):
group_chunks[var].append(len(ds_chunk[di]))
else:
group_chunks[var].append(chunks[di])
else:
group_chunks[var].append(len(ds_chunk[di]))
ds_chunk[var] = ds_chunk[var].chunk(tuple(group_chunks[var]))
group_chunks[var] = {'chunks':tuple(group_chunks[var])}
return group_chunks
# choose the chunking for the 'time' and 'feature_id' dimensions
encoding = gchunks(ds_out, {'time':672, 'feature_id':10000})
if __name__ == '__main__':
client = Client()
ds_out.to_zarr('./zarr/gulf_of_maine2', mode='w', encoding=encoding)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment