Created
April 21, 2020 19:38
-
-
Save AlexHilson/97cfaf77e6c96556a52092d94e704ada to your computer and use it in GitHub Desktop.
MDDA + Iris example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
this was hacked together sorry if it doesn't work... let me know :) | |
gdal + iris are best installed with conda | |
gdal is used for a neat trick to read the grib array straight into memory | |
that's not strictly necessary - if you wrote the data to a temp file you | |
could probably use iris-grib instead. | |
If you're using this seriously set your chunk size with care. e.g. if you | |
create a big cube with, say, 1x1x2560x1920 chunks, then try to pull 1 point from 33 chunks | |
in order to create a vertical profile you will download a lot of data.ArithmeticError | |
If you're trying to do something small like, extract a vertical profile, you're probably | |
best off making the request to MDDA directly, saving the result, and loading it into Iris. | |
The method below is best if you need to process lots the data - e.g. to run | |
statistical processing. | |
''' | |
import time | |
import json | |
import requests | |
import io | |
import uuid | |
import tempfile | |
import os | |
from getpass import getpass | |
from datetime import datetime | |
import iris | |
import gdal | |
import dask | |
import numpy as np | |
''' | |
set up junk | |
''' | |
try: | |
BASE_URL = os.environ['MDDA_BASE_URL'] | |
except KeyError: | |
BASE_URL = input('mdda url: ') | |
try: | |
api_key = os.environ['MDDA_API_KEY'] | |
except KeyError: | |
api_key = getpass('api key: ') | |
# hard coded collection id for pressure data | |
collection = 'mo-global-isbl' | |
collection_metadata = requests.get( | |
f'{BASE_URL}/collections/{collection}', | |
headers={'x-api-key': api_key}) | |
collection_metadata.raise_for_status() | |
collection_metadata = collection_metadata.json | |
# avoid the latest model run to ensure there's a full set of data | |
instance = collection_metadata['instances'][1]['instanceId'] | |
print(instance) | |
instance_metadata = requests.get( | |
f'{BASE_URL}/collections/{collection}/{instance}', | |
headers={'x-api-key': api_key}) | |
instance_metadata.raise_for_status() | |
instance_metadata = instance_metadata.json() | |
# lets just assume temperature is available | |
sample_param = instance_metadata['parameters']['temperature'] | |
z_axis = sample_param['coordinates']['z'] | |
t_axis = sample_param['coordinates']['t'] | |
print(f'number of vertical level coordinates: {len(z_axis)}') | |
print(f'number of time axis coordinates: {len(t_axis)}') | |
def get_grib_chunk(param, z_index, t_index, xmin=-180, xmax=180, ymin=-90, ymax=90): | |
# lazy hack | |
if type(t_index) != list: | |
t_index = [t_index] | |
body = { | |
'outputFormat': 'GRIB2', | |
'parameters': [param], | |
'subset': { | |
'x': {'lowerBound': xmin, 'upperBound': xmax}, | |
'y': {'lowerBound': ymin, 'upperBound': ymax}, | |
'z': {'selection': [z_index]}, | |
't': {'selection': t_index} | |
} | |
} | |
response = requests.post( | |
f'{BASE_URL}/collections/{collection}/{instance}/cube', | |
json=body, | |
headers={'x-api-key': api_key}) | |
response.raise_for_status() | |
return response.content | |
def grib_to_array(binary_content): | |
# print('array computed...') | |
''' | |
Use GDAL memory mapping to get numpy array without landing | |
data. No idea if this is efficient or not but it's neat | |
''' | |
fname = f'/vsimem/{uuid.uuid1()}.grib2' | |
gdal.FileFromMemBuffer(fname, binary_content) | |
ds = gdal.Open(fname).ReadAsArray() | |
return ds | |
def grib_to_cube(binary_content): | |
''' | |
Load some grib from memory to an in memory cube. | |
E.g. for validating / checking what the co-oord types are etc | |
''' | |
with tempfile.NamedTemporaryFile(suffix='.grib2') as fp: | |
fp.write(binary_content) | |
fp.flush() | |
cube = iris.load_cube(fp.name) | |
cube.data | |
return cube | |
def arr_to_cube(standard_name, units, x_dim_coord, y_dim_coord, t_aux_coord, z_aux_coord, arr): | |
''' | |
simplify iris cube creation signature a bit | |
''' | |
return iris.cube.Cube( | |
data=arr, | |
standard_name=standard_name, | |
units=units, | |
dim_coords_and_dims=[(y_dim_coord, 0), (x_dim_coord, 1)], | |
aux_coords_and_dims=[(z_aux_coord, None), (t_aux_coord, None)] | |
) | |
''' | |
ok those are the functions needed to do stuff. here's some test runs | |
''' | |
print('--- non lazy cube test ---') | |
print('get data from MDDA, load into memory, load into Iris') | |
data = get_grib_chunk(sample_param['parameterId'], z_axis[0], t_axis[0]) | |
arr = grib_to_array(data) | |
cube = grib_to_cube(data) | |
print(repr(cube)) | |
''' | |
we get the extra metadata we need from the sample file. | |
this lets us set up the lazy version, where we don't have the data on | |
hand to start with | |
''' | |
chunk_shape = arr.shape | |
chunk_dtype = arr.dtype | |
units = cube.units | |
standard_name = cube.standard_name | |
lat_coord = cube.coord('latitude') | |
lon_coord = cube.coord('longitude') | |
def time_dim_coord_factory(point): | |
return iris.coords.DimCoord( | |
standard_name='time', | |
points=[point], | |
units='seconds since 1970-01-01 00:00:00') | |
def level_dim_coord_factory(point): | |
return iris.coords.AuxCoord( | |
points=[point], | |
long_name='pressure', | |
units='Pa') | |
''' | |
delayed test run | |
''' | |
print('---- lazy cube test ----') | |
''' | |
convert axis values from mdda format to Iris format | |
''' | |
mdda_t = t_axis[0] | |
mdda_z = z_axis[0] | |
iris_t = time_dim_coord_factory( | |
datetime.strptime(mdda_t, "%Y-%m-%dT%H:%M:%S%z").timestamp()) | |
iris_z = level_dim_coord_factory(mdda_z) | |
@dask.delayed | |
def get_delayed_chunk(param_id, z_point, t_point): | |
chunk = get_grib_chunk(param_id, z_point, t_point) | |
arr = grib_to_array(chunk) | |
return arr | |
delayed_arr = dask.array.from_delayed( | |
get_delayed_chunk( | |
sample_param['parameterId'], | |
mdda_z, | |
mdda_t), | |
shape=chunk_shape, | |
dtype=chunk_dtype) | |
cube2 = arr_to_cube( | |
arr=delayed_arr, | |
standard_name=standard_name, | |
units=units, | |
x_dim_coord=lon_coord, | |
y_dim_coord=lat_coord, | |
t_aux_coord=iris_t, | |
z_aux_coord=iris_z) | |
print(cube2) | |
print(f'cube has lazy data: {cube2.has_lazy_data()}') | |
print(f'first 10 points: {cube2[:10, 0].data}') | |
''' | |
merge | |
''' | |
print('---- big lazy cube test ----') | |
''' | |
repeat the above step for every time step and level. | |
then let Iris work out how to merge that into a big cube | |
it's not pretty but it works and creating / merging virtual cubes is cheap | |
''' | |
cubes = [] | |
for mdda_t in t_axis: | |
iris_t = time_dim_coord_factory( | |
datetime.strptime(mdda_t, "%Y-%m-%dT%H:%M:%S%z").timestamp()) | |
for mdda_z in z_axis: | |
iris_z = level_dim_coord_factory(mdda_z) | |
delayed_arr = dask.array.from_delayed( | |
get_delayed_chunk( | |
sample_param['parameterId'], | |
mdda_z, | |
mdda_t), | |
shape=chunk_shape, | |
dtype=chunk_dtype) | |
cubes.append(arr_to_cube( | |
arr=delayed_arr, | |
standard_name=standard_name, | |
units=units, | |
x_dim_coord=lon_coord, | |
y_dim_coord=lat_coord, | |
t_aux_coord=iris_t, | |
z_aux_coord=iris_z)) | |
big_cube = iris.cube.CubeList(cubes).merge_cube() | |
print(big_cube) | |
print(f'big cube has lazy data: {big_cube.has_lazy_data()}') | |
print(f'underlying structure: {big_cube.lazy_data()}') | |
print( | |
"running cube[:, -1].collapsed(['latitude', 'longitude'], iris.analysis.MAX)") | |
start = time.time() | |
max_val = big_cube[:, -1].collapsed( | |
['latitude', 'longitude'], iris.analysis.MAX) | |
n_times = len(max_val.coord('time').points) | |
last_level = next(max_val.coord('pressure').cells()) | |
print( | |
f'max temperature for {n_times} timesteps at {last_level} Pa: {np.around(max_val.data, decimals=3)} K') | |
end = time.time() | |
print(f'took {round(end - start, 1)} seconds to compute') |
I actually wrote this a few months back so could be mis-remembering but pretty sure I tried BytesIO. As far as I can remember the Iris signature is either a filename or an array, there's no support for a file handle. So either end up using a temporary file or finding some other tool to load the array first. That's why GDAL is being used here, there are some other libraries around like cf-grib that looked viable. Unfortunately what I've done here with GDAL is actually invalid, it's reading the values incorrectly (I'd guess not accounting for scaled value in the grib file).
So I'm going to update this gist to replace GDAL with temp files throughout, which isn't ideal. At some point will investigate other options, maybe they're now viable.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Could you use https://docs.python.org/3.7/library/io.html#io.BytesIO to get rid of the named temp file and instead push the response straight into Iris?