Skip to content

Instantly share code, notes, and snippets.

@AlexHilson
Created April 21, 2020 19:38
Show Gist options
  • Save AlexHilson/97cfaf77e6c96556a52092d94e704ada to your computer and use it in GitHub Desktop.
Save AlexHilson/97cfaf77e6c96556a52092d94e704ada to your computer and use it in GitHub Desktop.
MDDA + Iris example
'''
this was hacked together sorry if it doesn't work... let me know :)
gdal + iris are best installed with conda
gdal is used for a neat trick to read the grib array straight into memory
that's not strictly necessary - if you wrote the data to a temp file you
could probably use iris-grib instead.
If you're using this seriously set your chunk size with care. e.g. if you
create a big cube with, say, 1x1x2560x1920 chunks, then try to pull 1 point from 33 chunks
in order to create a vertical profile you will download a lot of data.ArithmeticError
If you're trying to do something small like, extract a vertical profile, you're probably
best off making the request to MDDA directly, saving the result, and loading it into Iris.
The method below is best if you need to process lots the data - e.g. to run
statistical processing.
'''
import time
import json
import requests
import io
import uuid
import tempfile
import os
from getpass import getpass
from datetime import datetime
import iris
import gdal
import dask
import numpy as np
'''
set up junk
'''
try:
BASE_URL = os.environ['MDDA_BASE_URL']
except KeyError:
BASE_URL = input('mdda url: ')
try:
api_key = os.environ['MDDA_API_KEY']
except KeyError:
api_key = getpass('api key: ')
# hard coded collection id for pressure data
collection = 'mo-global-isbl'
collection_metadata = requests.get(
f'{BASE_URL}/collections/{collection}',
headers={'x-api-key': api_key})
collection_metadata.raise_for_status()
collection_metadata = collection_metadata.json
# avoid the latest model run to ensure there's a full set of data
instance = collection_metadata['instances'][1]['instanceId']
print(instance)
instance_metadata = requests.get(
f'{BASE_URL}/collections/{collection}/{instance}',
headers={'x-api-key': api_key})
instance_metadata.raise_for_status()
instance_metadata = instance_metadata.json()
# lets just assume temperature is available
sample_param = instance_metadata['parameters']['temperature']
z_axis = sample_param['coordinates']['z']
t_axis = sample_param['coordinates']['t']
print(f'number of vertical level coordinates: {len(z_axis)}')
print(f'number of time axis coordinates: {len(t_axis)}')
def get_grib_chunk(param, z_index, t_index, xmin=-180, xmax=180, ymin=-90, ymax=90):
# lazy hack
if type(t_index) != list:
t_index = [t_index]
body = {
'outputFormat': 'GRIB2',
'parameters': [param],
'subset': {
'x': {'lowerBound': xmin, 'upperBound': xmax},
'y': {'lowerBound': ymin, 'upperBound': ymax},
'z': {'selection': [z_index]},
't': {'selection': t_index}
}
}
response = requests.post(
f'{BASE_URL}/collections/{collection}/{instance}/cube',
json=body,
headers={'x-api-key': api_key})
response.raise_for_status()
return response.content
def grib_to_array(binary_content):
# print('array computed...')
'''
Use GDAL memory mapping to get numpy array without landing
data. No idea if this is efficient or not but it's neat
'''
fname = f'/vsimem/{uuid.uuid1()}.grib2'
gdal.FileFromMemBuffer(fname, binary_content)
ds = gdal.Open(fname).ReadAsArray()
return ds
def grib_to_cube(binary_content):
'''
Load some grib from memory to an in memory cube.
E.g. for validating / checking what the co-oord types are etc
'''
with tempfile.NamedTemporaryFile(suffix='.grib2') as fp:
fp.write(binary_content)
fp.flush()
cube = iris.load_cube(fp.name)
cube.data
return cube
def arr_to_cube(standard_name, units, x_dim_coord, y_dim_coord, t_aux_coord, z_aux_coord, arr):
'''
simplify iris cube creation signature a bit
'''
return iris.cube.Cube(
data=arr,
standard_name=standard_name,
units=units,
dim_coords_and_dims=[(y_dim_coord, 0), (x_dim_coord, 1)],
aux_coords_and_dims=[(z_aux_coord, None), (t_aux_coord, None)]
)
'''
ok those are the functions needed to do stuff. here's some test runs
'''
print('--- non lazy cube test ---')
print('get data from MDDA, load into memory, load into Iris')
data = get_grib_chunk(sample_param['parameterId'], z_axis[0], t_axis[0])
arr = grib_to_array(data)
cube = grib_to_cube(data)
print(repr(cube))
'''
we get the extra metadata we need from the sample file.
this lets us set up the lazy version, where we don't have the data on
hand to start with
'''
chunk_shape = arr.shape
chunk_dtype = arr.dtype
units = cube.units
standard_name = cube.standard_name
lat_coord = cube.coord('latitude')
lon_coord = cube.coord('longitude')
def time_dim_coord_factory(point):
return iris.coords.DimCoord(
standard_name='time',
points=[point],
units='seconds since 1970-01-01 00:00:00')
def level_dim_coord_factory(point):
return iris.coords.AuxCoord(
points=[point],
long_name='pressure',
units='Pa')
'''
delayed test run
'''
print('---- lazy cube test ----')
'''
convert axis values from mdda format to Iris format
'''
mdda_t = t_axis[0]
mdda_z = z_axis[0]
iris_t = time_dim_coord_factory(
datetime.strptime(mdda_t, "%Y-%m-%dT%H:%M:%S%z").timestamp())
iris_z = level_dim_coord_factory(mdda_z)
@dask.delayed
def get_delayed_chunk(param_id, z_point, t_point):
chunk = get_grib_chunk(param_id, z_point, t_point)
arr = grib_to_array(chunk)
return arr
delayed_arr = dask.array.from_delayed(
get_delayed_chunk(
sample_param['parameterId'],
mdda_z,
mdda_t),
shape=chunk_shape,
dtype=chunk_dtype)
cube2 = arr_to_cube(
arr=delayed_arr,
standard_name=standard_name,
units=units,
x_dim_coord=lon_coord,
y_dim_coord=lat_coord,
t_aux_coord=iris_t,
z_aux_coord=iris_z)
print(cube2)
print(f'cube has lazy data: {cube2.has_lazy_data()}')
print(f'first 10 points: {cube2[:10, 0].data}')
'''
merge
'''
print('---- big lazy cube test ----')
'''
repeat the above step for every time step and level.
then let Iris work out how to merge that into a big cube
it's not pretty but it works and creating / merging virtual cubes is cheap
'''
cubes = []
for mdda_t in t_axis:
iris_t = time_dim_coord_factory(
datetime.strptime(mdda_t, "%Y-%m-%dT%H:%M:%S%z").timestamp())
for mdda_z in z_axis:
iris_z = level_dim_coord_factory(mdda_z)
delayed_arr = dask.array.from_delayed(
get_delayed_chunk(
sample_param['parameterId'],
mdda_z,
mdda_t),
shape=chunk_shape,
dtype=chunk_dtype)
cubes.append(arr_to_cube(
arr=delayed_arr,
standard_name=standard_name,
units=units,
x_dim_coord=lon_coord,
y_dim_coord=lat_coord,
t_aux_coord=iris_t,
z_aux_coord=iris_z))
big_cube = iris.cube.CubeList(cubes).merge_cube()
print(big_cube)
print(f'big cube has lazy data: {big_cube.has_lazy_data()}')
print(f'underlying structure: {big_cube.lazy_data()}')
print(
"running cube[:, -1].collapsed(['latitude', 'longitude'], iris.analysis.MAX)")
start = time.time()
max_val = big_cube[:, -1].collapsed(
['latitude', 'longitude'], iris.analysis.MAX)
n_times = len(max_val.coord('time').points)
last_level = next(max_val.coord('pressure').cells())
print(
f'max temperature for {n_times} timesteps at {last_level} Pa: {np.around(max_val.data, decimals=3)} K')
end = time.time()
print(f'took {round(end - start, 1)} seconds to compute')
@JohnCrickett
Copy link

Could you use https://docs.python.org/3.7/library/io.html#io.BytesIO to get rid of the named temp file and instead push the response straight into Iris?

@AlexHilson
Copy link
Author

AlexHilson commented Apr 22, 2020

I actually wrote this a few months back so could be mis-remembering but pretty sure I tried BytesIO. As far as I can remember the Iris signature is either a filename or an array, there's no support for a file handle. So either end up using a temporary file or finding some other tool to load the array first. That's why GDAL is being used here, there are some other libraries around like cf-grib that looked viable. Unfortunately what I've done here with GDAL is actually invalid, it's reading the values incorrectly (I'd guess not accounting for scaled value in the grib file).

So I'm going to update this gist to replace GDAL with temp files throughout, which isn't ideal. At some point will investigate other options, maybe they're now viable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment