Skip to content

Instantly share code, notes, and snippets.

@andersy005
Created November 6, 2018 01:08
Show Gist options
  • Save andersy005/65988b939edd976d45247b7a1104e678 to your computer and use it in GitHub Desktop.
Save andersy005/65988b939edd976d45247b7a1104e678 to your computer and use it in GitHub Desktop.
Geotools
from __future__ import absolute_import, division, print_function
import numpy as np
import xarray as xr
#-------------------------------------------------------------------------------
#-- function
#-------------------------------------------------------------------------------
def weighted_rmsd(da_x,da_y,weights,avg_over_dims=[]):
if not avg_over_dims:
avg_over_dims = weights.dims
#-- apply NaN mask
valid = (da_x.notnull() & da_y.notnull())
weights = weights.where(valid)
return np.sqrt(weighted_mean((da_x-da_y)**2,weights,avg_over_dims,apply_nan_mask=False))
#-------------------------------------------------------------------------------
#-- function
#-------------------------------------------------------------------------------
def weighted_cov(da_x,da_y,weights,avg_over_dims=[]):
if not avg_over_dims:
avg_over_dims = weights.dims
#-- apply NaN mask
valid = (da_x.notnull() & da_y.notnull())
weights = weights.where(valid)
mean_x = weighted_mean(da_x,weights,avg_over_dims,apply_nan_mask=False)
mean_y = weighted_mean(da_y,weights,avg_over_dims,apply_nan_mask=False)
dev_x = da_x - mean_x
dev_y = da_y - mean_y
return weighted_mean(dev_x*dev_y,weights,avg_over_dims,apply_nan_mask=False)
#-------------------------------------------------------------------------------
#-- function
#-------------------------------------------------------------------------------
def weighted_cor(da_x,da_y,weights,avg_over_dims=[]):
return weighted_cov(da_x,da_y,weights,avg_over_dims) / \
np.sqrt(weighted_cov(da_x,da_x,weights,avg_over_dims) *
weighted_cov(da_y,da_y,weights,avg_over_dims))
#-------------------------------------------------------------------------------
#-- function
#-------------------------------------------------------------------------------
def weighted_sum(da,weights,sum_over_dims=[]):
if not sum_over_dims:
sum_over_dims = weights.dims
sum_over_dims_v = [k for k in sum_over_dims if k in da.dims]
if not sum_over_dims_v:
raise ValueError('Unexpected dimensions for variable {0}'.format(da.name))
attrs = da.attrs.copy()
encoding = da.encoding
#-- compute weighted sum
dao = (da * weights).sum(sum_over_dims_v)
if 'units' in attrs and 'units' in weights.attrs:
attrs['units'] = attrs['units']+' '+weights.attrs['units']
for att in ['grid_loc','coordinates']:
if att in attrs:
del attrs[att]
dao.attrs = attrs
dao.encoding = {key:val for key,val in encoding.items() if key in ['_FillValue','dtype']}
return dao
#-------------------------------------------------------------------------------
#-- function
#-------------------------------------------------------------------------------
def weighted_std(da,weights,avg_over_dims=[],apply_nan_mask=True,ddof=0):
if not avg_over_dims:
avg_over_dims = weights.dims
avg_over_dims_v = [k for k in avg_over_dims if k in da.dims]
if not avg_over_dims_v:
raise ValueError(('Unexpected dimensions for variable {0}: {1}\n\n'
'Average over dimensions: {2}').format(da.name,da,avg_over_dims))
attrs = da.attrs.copy()
encoding = da.encoding
#-- mask weights where field is missing
if apply_nan_mask:
weights = weights.where(da.notnull())
np.testing.assert_allclose((weights/weights.sum(avg_over_dims_v)).sum(avg_over_dims_v),1.)
da_mean = weighted_mean(da,weights,avg_over_dims,apply_nan_mask=False)
dao = np.sqrt((weights * (da - da_mean)**2).sum(avg_over_dims_v) / (weights.sum(avg_over_dims_v) - ddof))
for att in ['grid_loc','coordinates']:
if att in attrs:
del attrs[att]
dao.attrs = attrs
dao.encoding = {key:val for key,val in encoding.items() if key in ['_FillValue','dtype']}
return dao
#-------------------------------------------------------------------------------
#-- function
#-------------------------------------------------------------------------------
def weighted_mean(da,weights,avg_over_dims=[],apply_nan_mask=True):
if not avg_over_dims:
avg_over_dims = weights.dims
avg_over_dims_v = [k for k in avg_over_dims if k in da.dims]
if not avg_over_dims_v:
raise ValueError(('Unexpected dimensions for variable {0}: {1}\n\n'
'Average over dimensions: {2}').format(da.name,da,avg_over_dims))
attrs = da.attrs.copy()
encoding = da.encoding
#-- mask weights where field is missing (takes time)
if apply_nan_mask:
weights = weights.where(da.notnull())
np.testing.assert_allclose((weights/weights.sum(avg_over_dims_v)).sum(avg_over_dims_v),1.)
dao = (da * weights).sum(avg_over_dims_v) / weights.sum(avg_over_dims_v)
for att in ['grid_loc','coordinates']:
if att in attrs:
del attrs[att]
dao.attrs = attrs
dao.encoding = {key:val for key,val in encoding.items() if key in ['_FillValue','dtype']}
return dao
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import xarray as xr
import numpy as np
import geotools as gt
maskedarea = xr.DataArray(np.ones((10,10)),dims=('x','y'))
x = xr.DataArray(np.random.uniform(0,10,(10,10)),dims=('x','y'))
y = xr.DataArray(np.random.uniform(0,10,(10,10)),dims=('x','y'))
y[3,3:10] = np.nan
valid = (x.notnull() & y.notnull())
N = valid.sum()
x = x.where(valid)
y = y.where(valid)
x_dev = x - x.mean()
y_dev = y - y.mean()
cov = (x_dev * y_dev).sum() / N
covx = (x_dev ** 2).sum() / N
covy = (y_dev ** 2).sum() / N
cor = ( cov / np.sqrt(covx * covy) )
rmsd = np.sqrt(((x-y)**2).sum() / N )
def test_weighted_mean():
np.testing.assert_allclose(gt.weighted_mean(x,maskedarea,apply_nan_mask=True),x.mean())
def test_weighted_std():
np.testing.assert_allclose(gt.weighted_std(x,maskedarea,apply_nan_mask=True),x.std())
def test_weighted_cor():
np.testing.assert_allclose(gt.weighted_cor(x,y,maskedarea),cor)
def test_weighted_rmsd():
np.testing.assert_allclose(gt.weighted_rmsd(x,y,maskedarea),rmsd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment