Skip to content

Instantly share code, notes, and snippets.

@bradyrx
Created July 4, 2018 22:04
Show Gist options
  • Save bradyrx/49fe11507dd7de055fa2604707b569f0 to your computer and use it in GitHub Desktop.
Save bradyrx/49fe11507dd7de055fa2604707b569f0 to your computer and use it in GitHub Desktop.
"""
Test script for parallelization with groupby() objects in xarray.
"""
import numpy as np
import xarray as xr
from scipy import stats
def linear_regression(x):
t = range(len(x))
m, *_ = stats.linregress(t, x)
return xr.DataArray(m)
def regression_ufunc(x):
return xr.core.computation.apply_ufunc(linear_regression, x,
dask='parallelize',
input_core_dims=[['time']],
output_dtypes=[float])
def main():
# Create climate-like data
data = np.random.randn(100, 180,3 60)
lat = np.arange(-89.5, 90, 1)
lon = np.arange(0.5, 360, 1)
time = np.arange(0,100,1)
ds = xr.DataArray(data, coords=[time, lat, lon],
dims=['time', 'lat', 'lon'])
# Apply without parallelization
grouped = ds.stack(points=['lat', 'lon']).groupby('points')
m1 = grouped.apply(linear_regression).unstack('points')
# Attempt at parallelization
grouped = ds.stack(points=['lat','lon']).groupby('points')
m2 = regression_ufunc(grouped).unstack('points')
if __name__ == '__main__':
main()
@alexsalr
Copy link

alexsalr commented Jul 5, 2018

I tested your example and made some modifications on the parallelized calculation. Let me know if you have any question.

import numpy as np
import xarray as xr
from scipy import stats
from dask.distributed import Client

def linear_regression(x):
    t = range(len(x))
    m, *_ = stats.linregress(t, x)
    return xr.DataArray(m)

def linear_regression_p(x):
    t = range(len(x))
    m, *_ = stats.linregress(t, x)
    return m

def linear_regression_ufunc(x):
    ## Recieves a 3D ndarray from xr.DataArray chunks
    return np.apply_along_axis(linear_regression_p,-1,x)

def regression_paralellized(ds):
    # Apply with parallelization
    return xr.core.computation.apply_ufunc(linear_regression_ufunc,
                                           ds,
                                           dask='parallelized',
                                           input_core_dims=[['time']],
                                           output_dtypes=[float])

def regression_stack_unstack(ds):
    # Apply without parallelization
    grouped = ds.stack(points=['lat', 'lon']).groupby('points')
    return grouped.apply(linear_regression).unstack('points')

def main():
    data = np.random.randn(100, 180, 360)
    lat = np.arange(-89.5, 90, 1)
    lon = np.arange(0.5, 360, 1)
    time = np.arange(0,100,1)
    
    ds = xr.DataArray(data, coords=[time, lat, lon], dims=['time', 'lat', 'lon'])
    
    ## Without paralelization
    
    m1 = regression_stack_unstack(ds)
    
    ## With paralelization
    
    # Chunk data array
    ds_chunked = ds.chunk(chunks={'lat':50,'lon':50})
    # Setup a local cluster
    client = Client()
    
    # Lazy operation
    m2 = regression_paralellized(ds_chunked)
    # Compute results
    m2 = m2.compute()
    
    client.close()
    
if __name__ == '__main__':
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment