Skip to content

Instantly share code, notes, and snippets.

@pp-mo
Created March 7, 2017 11:30
Show Gist options
  • Save pp-mo/12f97ec50ebc3e6f5c90cbf6d271d41c to your computer and use it in GitHub Desktop.
Save pp-mo/12f97ec50ebc3e6f5c90cbf6d271d41c to your computer and use it in GitHub Desktop.
Iris Lazy-only aggregations
# N.B. currently requires/assumes "dask_timed" feature branch version of Iris (commit 1013ad9)
from functools import wraps
import dask.array as da
import numpy as np
import numpy.ma as ma
from iris.cube import Cube
from iris.coords import DimCoord
from iris.analysis import Aggregator, _build_dask_mdtol_function
from iris._lazy_data import array_masked_to_nans, is_lazy_data
from biggus._init import Array
#
# Q: How easy can we make it to make an aggregator from a dask function ?
#
# (1): ignore weighted types for now -- won't be hard to extend.
# (2): make a derived DaskAggregator(_Aggregator) class.
# (3): provide a statistical function f(array, axis)
# -- it should support/preserve NaNs for missing data.
# (4): we can use existing _build_dask_mdtol_function to 'add' mdtol support.
# (5): we will *automatically* build a 'non-lazy' version of the call.
# (6): we still have to provide a units conversion routine.
#
#
# cf. examples from iris.analysis
#
# MIN = Aggregator('minimum', ma.min)
#
# PROPORTION = Aggregator('proportion',
# _proportion,
# units_func=lambda units: 1)
def _array_as_lazy(array):
# Code stolen from cube.py
if is_lazy_data(array):
result = array
else:
if isinstance(array, ma.masked_array):
array = array_masked_to_nans(array)
array = array.data
result = da.from_array(array, chunks=array.shape)
return result
class LazyAggregator(Aggregator):
def __init__(self, cell_method_string, dask_func,
units_func=None, **kwargs):
"""
Create an aggregator based on a dask statistic function.
The function should have the signature:
"dask_func(dask_array, axis=None, **kwargs)".
The aggregator will automatically support an "mdtol" keyword, which the
underlying dask function does not need to support it, but it *should*
treat NaNs as missing data in calculations.
"""
@wraps(dask_func)
def my_realdata_func(real_array, axis=None, **kwargs):
#
# NOTE: in this implementation, there are still two implementations
# of the 'mdtol' behaviour : For lazy data it must be in the
# statistic function, but for real data it is done in
# "cube.collapsed".
# So the 'real data function' does *not* need to support mdtol.
#
lazy_array = _array_as_lazy(real_array)
dask_result = self.lazy_func(lazy_array, axis=axis, **kwargs)
#
# NOTE: the translation of this *back* into cube data depends on
# the cube, determining the eventual dtype.
# This only works because we can (now) return a lazy array to
# assign to cube.data "in place of" a real one -- which is what the
# generic aggregator code does with this result.
# Thus, it may *not* work if the original cube has a cube._dtype.
# Also we can't really return ints, as we _should_ support masking.
#
return dask_result
mdtol_dask_func = _build_dask_mdtol_function(dask_func)
super(LazyAggregator, self).__init__(cell_method=cell_method_string,
call_func=my_realdata_func,
units_func=units_func,
lazy_func=mdtol_dask_func,
**kwargs)
def dask_gmean(array, axis):
point_counts = da.nansum(da.where(da.isnan(array), 0, 1), axis=axis)
mean_logs = da.sum(da.log(array), axis=axis)
return da.exp(mean_logs / point_counts)
LAZY_GMEAN = LazyAggregator('geometric-mean', dask_gmean)
def dask_bare_COUNT(data, axis, my_boolean_function=None):
# (Version *without* built-in mdtol support).
# Call user function and convert True/False to 1.0/0.0.
data_1and0 = da.where(my_boolean_function(data), 1.0, 0.0)
# Make sure we have NaN wherever the input is NaN.
data_1_0_nan = da.where(da.isnan(data), np.nan, data_1and0)
# Return the number of 'True' points summed over the required axis(es).
result_counts = da.sum(data_1_0_nan, axis=axis)
return result_counts
LAZY_COUNT = LazyAggregator('special-count-stat', dask_bare_COUNT)
#
# NOTE: there is a slight problem here.
# WE really wanted to make the results of the COUNT statistic *ints*,
# but we currently have no good way of doing that, because of our need for NaNs
# to handle masking.
# So, we should probably add a 'dtype_func' to the aggregator constructor.
# Probably implemented in a "update_metadata" or "post_process" override ?
#
# SEE THAT IN THE FOLLOWING CLASS ...
#
class ImprovedLazyAggregator(LazyAggregator):
def __init__(self, *args, **kwargs):
"""
Support an extra 'dtype_func' keyword to define the dtype of the
result.
If 'dtype_func' is defined, it has the signature:
"dtype_func(data_result, collapsed_cube)".
It present, this is called during the aggregator "post_process" call
and the result is assigned as the new dtype of the cube, including
setting "cube.dtype = None".
"""
self.dtype_func = kwargs.pop('dtype_func', None)
super(ImprovedLazyAggregator, self).__init__(*args, **kwargs)
def post_process(self, collapsed_cube, data_result, coords, **kwargs):
result_cube = super(ImprovedLazyAggregator, self).post_process(
collapsed_cube, data_result, coords, **kwargs)
if self.dtype_func is not None:
new_dtype = self.dtype_func(data_result, collapsed_cube)
result_cube.dtype = new_dtype
return result_cube
#
# This solution should now probably be in the main Aggregator class ?
#
INT_LAZY_COUNT = ImprovedLazyAggregator(
'special-count-stat', dask_bare_COUNT,
dtype_func=lambda data, cube: np.int64)
if __name__ == '__main__':
# Simple test
cube = Cube([[1., 10., 100.], [1., 2., 4.]], units='kg')
cube.add_dim_coord(DimCoord(np.arange(2), long_name='y'), 0)
cube.add_dim_coord(DimCoord(np.arange(3), long_name='x'), 1)
gmean = cube.collapsed('x', LAZY_GMEAN)
print
print 'SOURCE CUBE + DATA:'
print cube
print cube.data
print
print 'gmean-x:'
print gmean
print gmean.data
gmean = cube.collapsed('y', LAZY_GMEAN)
print
print 'gmean-y:'
print gmean
print gmean.data
# Less simple example.
counts = cube.collapsed('x', LAZY_COUNT,
my_boolean_function=lambda x: x > 3.7)
print
print 'counts-x (> 3.7):'
print counts
print counts.data
counts = cube.collapsed('y', LAZY_COUNT,
my_boolean_function=lambda x: x < 2.5)
print
print 'counts-y (< 2.5):'
print counts
print counts.data
counts = cube.collapsed('y', INT_LAZY_COUNT,
my_boolean_function=lambda x: x < 2.5)
print
print 'INT-counts-y (< 2.5):'
print counts
print counts.data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment