Created
March 7, 2017 11:30
-
-
Save pp-mo/12f97ec50ebc3e6f5c90cbf6d271d41c to your computer and use it in GitHub Desktop.
Iris Lazy-only aggregations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# N.B. currently requires/assumes "dask_timed" feature branch version of Iris (commit 1013ad9) | |
from functools import wraps | |
import dask.array as da | |
import numpy as np | |
import numpy.ma as ma | |
from iris.cube import Cube | |
from iris.coords import DimCoord | |
from iris.analysis import Aggregator, _build_dask_mdtol_function | |
from iris._lazy_data import array_masked_to_nans, is_lazy_data | |
from biggus._init import Array | |
# | |
# Q: How easy can we make it to make an aggregator from a dask function ? | |
# | |
# (1): ignore weighted types for now -- won't be hard to extend. | |
# (2): make a derived DaskAggregator(_Aggregator) class. | |
# (3): provide a statistical function f(array, axis) | |
# -- it should support/preserve NaNs for missing data. | |
# (4): we can use existing _build_dask_mdtol_function to 'add' mdtol support. | |
# (5): we will *automatically* build a 'non-lazy' version of the call. | |
# (6): we still have to provide a units conversion routine. | |
# | |
# | |
# cf. examples from iris.analysis | |
# | |
# MIN = Aggregator('minimum', ma.min) | |
# | |
# PROPORTION = Aggregator('proportion', | |
# _proportion, | |
# units_func=lambda units: 1) | |
def _array_as_lazy(array): | |
# Code stolen from cube.py | |
if is_lazy_data(array): | |
result = array | |
else: | |
if isinstance(array, ma.masked_array): | |
array = array_masked_to_nans(array) | |
array = array.data | |
result = da.from_array(array, chunks=array.shape) | |
return result | |
class LazyAggregator(Aggregator): | |
def __init__(self, cell_method_string, dask_func, | |
units_func=None, **kwargs): | |
""" | |
Create an aggregator based on a dask statistic function. | |
The function should have the signature: | |
"dask_func(dask_array, axis=None, **kwargs)". | |
The aggregator will automatically support an "mdtol" keyword, which the | |
underlying dask function does not need to support it, but it *should* | |
treat NaNs as missing data in calculations. | |
""" | |
@wraps(dask_func) | |
def my_realdata_func(real_array, axis=None, **kwargs): | |
# | |
# NOTE: in this implementation, there are still two implementations | |
# of the 'mdtol' behaviour : For lazy data it must be in the | |
# statistic function, but for real data it is done in | |
# "cube.collapsed". | |
# So the 'real data function' does *not* need to support mdtol. | |
# | |
lazy_array = _array_as_lazy(real_array) | |
dask_result = self.lazy_func(lazy_array, axis=axis, **kwargs) | |
# | |
# NOTE: the translation of this *back* into cube data depends on | |
# the cube, determining the eventual dtype. | |
# This only works because we can (now) return a lazy array to | |
# assign to cube.data "in place of" a real one -- which is what the | |
# generic aggregator code does with this result. | |
# Thus, it may *not* work if the original cube has a cube._dtype. | |
# Also we can't really return ints, as we _should_ support masking. | |
# | |
return dask_result | |
mdtol_dask_func = _build_dask_mdtol_function(dask_func) | |
super(LazyAggregator, self).__init__(cell_method=cell_method_string, | |
call_func=my_realdata_func, | |
units_func=units_func, | |
lazy_func=mdtol_dask_func, | |
**kwargs) | |
def dask_gmean(array, axis): | |
point_counts = da.nansum(da.where(da.isnan(array), 0, 1), axis=axis) | |
mean_logs = da.sum(da.log(array), axis=axis) | |
return da.exp(mean_logs / point_counts) | |
LAZY_GMEAN = LazyAggregator('geometric-mean', dask_gmean) | |
def dask_bare_COUNT(data, axis, my_boolean_function=None): | |
# (Version *without* built-in mdtol support). | |
# Call user function and convert True/False to 1.0/0.0. | |
data_1and0 = da.where(my_boolean_function(data), 1.0, 0.0) | |
# Make sure we have NaN wherever the input is NaN. | |
data_1_0_nan = da.where(da.isnan(data), np.nan, data_1and0) | |
# Return the number of 'True' points summed over the required axis(es). | |
result_counts = da.sum(data_1_0_nan, axis=axis) | |
return result_counts | |
LAZY_COUNT = LazyAggregator('special-count-stat', dask_bare_COUNT) | |
# | |
# NOTE: there is a slight problem here. | |
# WE really wanted to make the results of the COUNT statistic *ints*, | |
# but we currently have no good way of doing that, because of our need for NaNs | |
# to handle masking. | |
# So, we should probably add a 'dtype_func' to the aggregator constructor. | |
# Probably implemented in a "update_metadata" or "post_process" override ? | |
# | |
# SEE THAT IN THE FOLLOWING CLASS ... | |
# | |
class ImprovedLazyAggregator(LazyAggregator): | |
def __init__(self, *args, **kwargs): | |
""" | |
Support an extra 'dtype_func' keyword to define the dtype of the | |
result. | |
If 'dtype_func' is defined, it has the signature: | |
"dtype_func(data_result, collapsed_cube)". | |
It present, this is called during the aggregator "post_process" call | |
and the result is assigned as the new dtype of the cube, including | |
setting "cube.dtype = None". | |
""" | |
self.dtype_func = kwargs.pop('dtype_func', None) | |
super(ImprovedLazyAggregator, self).__init__(*args, **kwargs) | |
def post_process(self, collapsed_cube, data_result, coords, **kwargs): | |
result_cube = super(ImprovedLazyAggregator, self).post_process( | |
collapsed_cube, data_result, coords, **kwargs) | |
if self.dtype_func is not None: | |
new_dtype = self.dtype_func(data_result, collapsed_cube) | |
result_cube.dtype = new_dtype | |
return result_cube | |
# | |
# This solution should now probably be in the main Aggregator class ? | |
# | |
INT_LAZY_COUNT = ImprovedLazyAggregator( | |
'special-count-stat', dask_bare_COUNT, | |
dtype_func=lambda data, cube: np.int64) | |
if __name__ == '__main__': | |
# Simple test | |
cube = Cube([[1., 10., 100.], [1., 2., 4.]], units='kg') | |
cube.add_dim_coord(DimCoord(np.arange(2), long_name='y'), 0) | |
cube.add_dim_coord(DimCoord(np.arange(3), long_name='x'), 1) | |
gmean = cube.collapsed('x', LAZY_GMEAN) | |
print 'SOURCE CUBE + DATA:' | |
print cube | |
print cube.data | |
print 'gmean-x:' | |
print gmean | |
print gmean.data | |
gmean = cube.collapsed('y', LAZY_GMEAN) | |
print 'gmean-y:' | |
print gmean | |
print gmean.data | |
# Less simple example. | |
counts = cube.collapsed('x', LAZY_COUNT, | |
my_boolean_function=lambda x: x > 3.7) | |
print 'counts-x (> 3.7):' | |
print counts | |
print counts.data | |
counts = cube.collapsed('y', LAZY_COUNT, | |
my_boolean_function=lambda x: x < 2.5) | |
print 'counts-y (< 2.5):' | |
print counts | |
print counts.data | |
counts = cube.collapsed('y', INT_LAZY_COUNT, | |
my_boolean_function=lambda x: x < 2.5) | |
print 'INT-counts-y (< 2.5):' | |
print counts | |
print counts.data | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment