Skip to content

Instantly share code, notes, and snippets.

@gmaze
Created February 12, 2020 12:26
Show Gist options
  • Save gmaze/0b45d925c8492e0c0daaed654e2e1965 to your computer and use it in GitHub Desktop.
Save gmaze/0b45d925c8492e0c0daaed654e2e1965 to your computer and use it in GitHub Desktop.
#~/usr/bin/env python
#
# Useful functions for xarray time series analysis
# (c) G. Maze, Ifremer
#
import numpy as np
import xarray as xr
import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
from scipy.signal import detrend, butter, lfilter
from sklearn.linear_model import LinearRegression
from scipy import signal
from scipy import interpolate
from scipy.fftpack import rfft, irfft, fftfreq
import holoviews as hv
from holoviews.streams import Stream, param, Params
import panel as pn
def vrange(x):
return np.nanmin(x), np.nanmax(x)
def apply_to_two_periods(da, periods=[1, 2], reduce=np.mean, apply=np.subtract):
"""For a given year, reduce to 2 periods and then apply a function to them
Ex: Compute mean(JJA) minus mean(JFM)
apply_to_two_periods(da, periods=[[6,7,8] , [1,2,3]], reduce=np.mean, apply=np.subtract)
"""
perA = periods[0]
perB = periods[1]
iperA = np.isin(da['time.month'].values, perA, assume_unique=False)
iperB = np.isin(da['time.month'].values, perB, assume_unique=False)
valA = da.isel(time=iperA).reduce(reduce)
valB = da.isel(time=iperB).reduce(reduce)
return apply(valA, valB)
def apply_to_one_period(da, period=[1, 2, 3], reduce=np.mean):
"""For a given year, reduce to 1 period and then apply a function to it
Ex:
Compute annual time series with mean(JJA):
apply_to_one_period(da, period=[6,7,8], reduce=np.mean)
"""
iperA = np.isin(da['time.month'].values, period, assume_unique=False)
return da.isel(time=iperA).reduce(reduce)
def monthly_to_annual(da, window=None, fct=np.mean, periods=[1, 2], center='01-01'):
"""Sub-sample an inter-annual monthly time series using specific operators and time windows
Examples:
Annual maximum:
monthly_to_annual(da, window='year', fct=np.max)
Annual maximum over JFM only:
monthly_to_annual(da, window='period', periods=[1,2,3], fct=np.max)
Annual mean(JJA):
monthly_to_annual(da, window='period', periods=[6,7,8], fct=np.mean)
Annual mean(JJA) - mean(JFM):
monthly_to_annual(da, window='delta', periods=[[6,7,8],[1,2,3]], fct=np.mean)
March of the following year versus September of this year:
monthly_to_annual(da, window='forward', periods=[9,3], fct=np.mean)
This is like: f(Y+1,Mar.) - f(Y,Sep.), output centered on Y
JFM of this year versus JJA of the previous year:
monthly_to_annual(da, window='backward', periods=[[6,7,8],[1,2,3]], fct=np.mean)
This is like: f(Y,Jan./Feb./Mar.) - f(Y-1,Jun./Jul./Aug.), output centered on Y
New time series on centered on January 1st by default, you change to another
day of the year with 'center':
monthly_to_annual(da, window='year', center='06-15') # Center on June 15th
gmaze@ifremer.fr
"""
# New time axis centered on January 1st:
new_time_axis = np.arange(np.datetime64(pd.to_datetime(str(da['time'].values[0])).strftime('%Y')),
np.datetime64(pd.to_datetime(str(da['time'].values[-1])).strftime('%Y'))\
+ np.timedelta64(1, '1Y'),
np.timedelta64(1, '1Y'))
# center = '06-01' # June 1st
new_time_axis = np.array(
[np.datetime64(pd.to_datetime(i + '-' + center)) for i in pd.to_datetime(new_time_axis).strftime('%Y')])
if window == 'year':
# By year:
x_sub = da.groupby('time.year').apply(fct)
x_sub['year'] = new_time_axis
elif window == 'period':
# By year and period for each years:
x_sub = da.groupby('time.year').apply(apply_to_one_period, period=periods, reduce=fct)
x_sub['year'] = new_time_axis
elif window == 'delta':
# By year and 2 periods for each years:
x_sub = da.groupby('time.year').apply(apply_to_two_periods, periods=periods, reduce=np.mean, apply=np.subtract)
x_sub['year'] = new_time_axis
elif window == 'forward':
# By year/period and the next year/period:
x_center = da.groupby('time.year').apply(apply_to_one_period, period=periods[0], reduce=fct)
x_forward = da.groupby('time.year').apply(apply_to_one_period, period=periods[1], reduce=fct)
x_forward = x_forward[1:]
x_forward['year'] = x_forward['year'] - 1
x_sub = x_forward - x_center
x_sub['year'] = new_time_axis[0:-1]
elif window == 'backward':
# By year/period and the previous year/period:
x_backward = da.groupby('time.year').apply(apply_to_one_period, period=periods[0], reduce=fct)
x_backward = x_backward[0:-1]
x_backward['year'] = x_backward['year'] + 1
x_center = da.groupby('time.year').apply(apply_to_one_period, period=periods[1], reduce=fct)
x_sub = x_center - x_backward
x_sub['year'] = new_time_axis[1:]
else:
raise Exception('Unknown window code: {}'.format(window))
# Adjust time axis:
x_sub = x_sub.rename({'year': 'time'})
return x_sub
def get_seascyc(da, freq='M', tile=True):
""" Compute a typical seasonal cycle from a monthly xr.DataArray time series
The Seas.Cyc. is computed as the mean of eah months for which annual means
have been removed.
Input
-----
xr.DataArray
A monthly time series with a 'time' axis
Returns
-------
xr.DataArray
the typical seasonal cycle repeated along the orginial time axis
gmaze@ifremer.fr
"""
ts = da.copy(deep=True) # Working copy of the observed time series
# Remove annual means:
yr_mean = ts.groupby('time.year').mean('time') # Annual mean time series
ts_yan = ts.values # To hold the time series with annual means removed
years = [int(str(i)) for i in da['time'].values.astype('datetime64[Y]')]
for y in yr_mean['year']:
v = yr_mean.sel(year=y).values
mask = np.argwhere(years == y.values)
ts_yan[mask] = ts_yan[mask] - np.tile(v, mask.shape)
ts_yan = xr.DataArray(ts_yan, dims='time', coords={'time': ts['time'].values})
# Compute typical seasonal cycle:
if freq == 'M':
sc = ts_yan.groupby('time.month').mean('time')
seq = ts['time'].values.astype('datetime64[M]').astype(int) % 12 + 1
elif freq == 'W':
sc = ts_yan.groupby('time.week').mean('time')
seq = ts['time'].values.astype('datetime64[W]').astype(int) % 52 + 1
elif freq == 'D':
sc = ts_yan.groupby('time.dayofyear').mean('time')
seq = ts['time'].values.astype('datetime64[D]').astype(int) % 365 + 1
# Broadcast to full time series:
if tile:
ts_sc = np.empty((len(ts['time']),))
axis = sc[sc.dims[0]].values
for this, ii in zip(axis, np.arange(0, len(axis))):
fill_value = sc[ii].values
mask = np.argwhere(seq == this)
ts_sc[mask] = np.tile(fill_value, len(mask))[:, np.newaxis]
# Output
return xr.DataArray(ts_sc, dims='time', coords={'time': ts['time'].values}).rename('seasonal')
else:
# Output
return sc.rename('seasonal')
def freq_to_scperiod(freq):
"""Return one annual cycle length for a given Pandas time index
Parameters
----------
freq : str or offset
Frequency to convert
Returns
-------
period : int
Periodicity of freq
Notes
-----
Annual maps to 1, quarterly maps to 4, monthly to 12, weekly to 52.
https://github.com/statsmodels/statsmodels/issues/2856
"""
freq = freq.rule_code.upper()
if freq == 'A' or freq.startswith(('A-', 'AS-')):
return 1
elif freq == 'Q' or freq.startswith(('Q-', 'QS-')):
return 4
elif freq == 'M' or freq.startswith(('M-', 'MS')):
return 12
elif freq == 'W' or freq.startswith('W-'):
return 52
elif freq == 'D':
return 365
elif freq == 'H':
return 365 * 24
else: # pragma : no cover
raise ValueError("freq {} not understood. Please report if you "
"think this is in error.".format(freq))
def decompose_ts(da, freq='M'):
"""Seasonal decomposition using moving averages of a xr.DataArray time series
Input
-----
xr.DataArray
A time series with a 'time' axis.
freq: str
Time series frequency: 'Y': annual M': monthly, 'D': daily
Returns
-------
xr.DataSet
A dataset with the input time series additive decomposition
"""
# Construct a pandas time series with the xarray.DataArray :
dt, c = '30 days', 1
if freq is 'Y': dt, c = '365 day', 1
if freq is 'D': dt, c = '1 day', 0
index = pd.date_range(
pd.to_datetime(str(da['time'].values[0])).strftime('%Y-%m-%d'),
(pd.to_datetime(str(da['time'].values[-1])) + c * pd.Timedelta(dt)).strftime('%Y-%m-%d'), freq=freq)
data = pd.Series(da.values, index=index)
# Then decompose:
result = seasonal_decompose(data, model='additive', extrapolate_trend='freq',
freq=freq_to_scperiod(data.index.freq))
# Create a new xarray.dataset with all signals:
r = result.trend.to_xarray().rename('trend').to_dataset()
r['seasonal'] = result.seasonal.to_xarray()
r['resid'] = result.resid.to_xarray()
r['observed'] = result.observed.to_xarray()
r = r.rename({'index': 'time'})
return r
def decompose_tsia(da, freq='M', cutoff=5):
"""Additive decomposition of a xr.DataArray time series
Decompose a time series into a:
- Seasonal Cycle
- Linear trend
- Low Frequency (lower than cutoff)
- Interannual (from 1 year to cutoff)
- Residual
Filtering is done with a simple moving average
Returns
-------
xr.DataSet
A dataset with the input time series additive decomposition
gmaze@ifremer.fr
"""
# Seasonal cycle length for a given frequency:
if freq == 'Y': w = 1
if freq == 'M': w = 12
if freq == 'W': w = 52
if freq == 'D': w = 365
# Init output DataSet:
RES = xr.DataArray(da.values,
dims='time',
coords={'time': da['time'].values},
name='observed').to_dataset()
# 1st extract the seasonal cycle:
if freq != 'Y':
RES['seasonal'] = get_seascyc(da, freq=freq, tile=True)
else:
RES['seasonal'] = xr.DataArray(np.zeros_like(RES['observed'].values), dims='time')
# 2nd: Remove linear trend:
y = RES['observed'].values - RES['seasonal'].values
x = np.arange(0, len(y))
iok = np.argwhere(~np.isnan(y))
model = LinearRegression().fit(x[iok], y[iok])
y_pred = model.predict(x[:, np.newaxis]).squeeze()
RES['linear_trend'] = xr.DataArray(y_pred, dims='time')
# 3rd: Very Low Frequency filter out above 'cutoff' years:
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values
y_pred = xr.DataArray(y, dims='time').rolling(time=cutoff * w, center=True).mean()
RES['low_freq'] = y_pred
# 4th: Interannual
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values \
- RES['low_freq'].values
y_pred = xr.DataArray(y, dims='time').rolling(time=w, center=True).mean()
RES['interannual'] = y_pred
# Residual:
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values \
- RES['low_freq'].values - RES['interannual'].values
y_pred = y # Residual !
RES['residual'] = xr.DataArray(y_pred, dims='time')
# Return new xarray.dataset with all signal components:
return RES
def plot_decompose_tsia(tsa, grid='split', width=600, height=200):
w, h = width, height
if 'interannual' in tsa:
if grid == 'split':
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h)
+ tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h)
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h)
+ tsa['interannual'].hvplot(label='Interannual').opts(show_grid=True, width=w, height=h)
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h)
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h)
)
layout.cols(2)
if grid == 'classic':
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h)
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h)
+ tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h)
* tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h)
* tsa['interannual'].hvplot(label='Interannual').opts(show_grid=True, width=w, height=h)
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h)
)
layout.cols(2)
if grid == 'pair':
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h)
* tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h)
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h)
* tsa['interannual'].hvplot(label='Interannual').opts(show_grid=True, width=w, height=h)
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h)
* tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h)
)
layout.cols(2)
if grid == 'mix':
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h)
* (tsa['seasonal'] + tsa['linear_trend']).hvplot(label='Linear Trend + Seasonal').opts(show_grid=True,
width=w,
height=h)
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h)
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h)
)
layout.cols(2)
else:
if grid == 'split':
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h)
+ tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h)
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h)
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h)
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h)
)
layout.cols(2)
if grid == 'classic':
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h)
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h)
+ tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h)
* tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h)
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h)
)
layout.cols(2)
if grid == 'pair':
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h)
* tsa['linear_trend'].hvplot(label='Linear Trend').opts(show_grid=True, width=w, height=h)
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h)
+ tsa['seasonal'].hvplot(label='Seasonal').opts(show_grid=True, width=w, height=h)
* tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h)
)
layout.cols(2)
if grid == 'mix':
layout = (tsa['observed'].hvplot(label='Observed').opts(show_grid=True, width=w, height=h)
* (tsa['seasonal'] + tsa['linear_trend']).hvplot(label='Linear Trend + Seasonal').opts(show_grid=True,
width=w,
height=h)
+ tsa['low_freq'].hvplot(label='Low Frequency').opts(show_grid=True, width=w, height=h)
+ tsa['residual'].hvplot(label='Residual').opts(show_grid=True, width=w, height=h)
)
layout.cols(2)
return layout
class tsia_decomposer:
""" Time series decomposition """
def __init__(self, \
filter_type='Hanning', \
window_size=5 * 12, \
cutoff=10 * 12, \
windowIA_size=2 * 12, \
cutoffIA=1 * 12, \
freq='M', \
fill_borders=False):
self.filter_type = filter_type
self.window_size = window_size
self.cutoff = cutoff
self.windowIA_size = windowIA_size
self.cutoffIA = cutoffIA
self.freq = freq
self.fill = fill_borders
def low_pass_weights_ma(self, window):
w = np.repeat(1.0, window) / window
return w
def low_pass_weights_hanning(self, window):
w = signal.hann(window)
w = w / sum(w)
return w
def low_pass_weights_lanczos(self, window, cutoff):
order = ((window - 1) // 2) + 1
nwts = 2 * order + 1
w = np.zeros([nwts])
n = nwts // 2
w[n] = 2 * cutoff
k = np.arange(1., n)
sigma = np.sin(np.pi * k / n) * n / (np.pi * k)
firstfactor = np.sin(2. * np.pi * cutoff * k) / (np.pi * k)
w[n - 1:0:-1] = firstfactor * sigma
w[n + 1:-1] = firstfactor * sigma
return w[1:-1]
def low_pass_fft(self, y, window):
sig = y.values
iok = ~np.isnan(sig)
sig = sig[iok]
N, fs = sig.size, 1
tim = np.arange(N) / fs
W = fftfreq(sig.size, d=tim[1] - tim[0])
f_signal = rfft(sig)
cut_f_signal = f_signal.copy()
cut_f_signal[(W > 1 / window)] = 0
cut_signal = irfft(cut_f_signal)
y.values[iok] = cut_signal
return y
def low_pass_butter(self, y, window):
sig = y.values
iok = ~np.isnan(sig)
sig = sig[iok]
fs = 1
cutoff_month = 1 / window
w = cutoff_month / (fs / 2) # Normalize the frequency
b, a = signal.butter(5, w, 'low')
cut_signal = signal.filtfilt(b, a, sig)
y.values[iok] = cut_signal
return y
def low_pass(self, y, window=12, cutoff=1 / 60.):
# Get weights:
if self.filter_type == 'Moving Average':
wgts = self.low_pass_weights_ma(window)
# Apply weights with xarray:
weight = xr.DataArray(wgts, dims=['window'])
y_pred = y.rolling(time=len(wgts), center=True).construct('window').dot(weight)
if self.filter_type == 'Lanczos':
wgts = self.low_pass_weights_lanczos(window, cutoff)
# Apply weights with xarray:
weight = xr.DataArray(wgts, dims=['window'])
y_pred = y.rolling(time=len(wgts), center=True).construct('window').dot(weight)
if self.filter_type == 'Hanning':
wgts = self.low_pass_weights_hanning(window)
# Apply weights with xarray:
weight = xr.DataArray(wgts, dims=['window'])
y_pred = y.rolling(time=len(wgts), center=True).construct('window').dot(weight)
if self.filter_type == 'FFT':
y_pred = self.low_pass_fft(y, window)
if self.filter_type == 'Butterworth':
y_pred = self.low_pass_butter(y, window)
# Recenter around the initial time mean:
#y_pred = y_pred - y_pred.mean()+y.mean()
return y_pred
def fill_borders(self, d, method='spline'):
x = np.arange(0, len(d.values))
iok = np.argwhere(~np.isnan(d.values))
if self.freq == 'M': c = 12
if self.freq == 'Y': c = 1
# xi = np.concatenate((np.array((-12*2,)),x,np.array((np.max(x)+12*2,))),axis=0)
# yi = np.concatenate((np.array((d.values[iok[0]])),d.values,np.array((d.values[iok][-1]))),axis=0)
xi = np.concatenate((np.array((-c * 10,)), np.array((-c * 5,)), x, np.array((np.max(x) + c * 5,)),
np.array((np.max(x) + c * 10,))), axis=0)
yi = np.concatenate((np.array((d.values[iok[0]])), np.array((d.values[iok[0]])), d.values,
np.array((d.values[iok][-1])), np.array((d.values[iok][-1]))), axis=0)
ioki = np.argwhere(~np.isnan(yi))
if method == 'spline':
s = interpolate.InterpolatedUnivariateSpline(xi[ioki], yi[ioki])
yi = s(xi)
# d.values = yi[1:-1]
d.values = yi[2:-2]
if method == 'poly':
# poly = np.polyfit(x[iok].squeeze(), d.values[iok], deg=5)
# yi = np.polyval(poly, x)
# d.values = yi
poly = np.polyfit(xi[ioki], yi[ioki], deg=5)
yi = np.polyval(poly, xi)
d.values = yi[2:-2]
return d
def fit_predict(self, da):
# Init output DataSet:
RES = xr.DataArray(da.values,
dims='time',
coords={'time': da['time'].values},
name='observed').to_dataset()
# 1st extract the seasonal cycle:
if self.freq != 'Y':
RES['seasonal'] = get_seascyc(da, freq=self.freq, tile=True)
else:
RES['seasonal'] = xr.DataArray(np.zeros_like(RES['observed'].values), dims='time')
# 2nd: Remove linear trend:
y = RES['observed'].values - RES['seasonal'].values
x = np.arange(0, len(y))
iok = np.argwhere(~np.isnan(y))
model = LinearRegression().fit(x[iok], y[iok])
y_pred = model.predict(x[:, np.newaxis]).squeeze()
RES['linear_trend'] = xr.DataArray(y_pred, dims='time')
# 3rd: Very Low Frequency filter out above X years:
y = RES['observed'] - RES['seasonal'] - RES['linear_trend']
y_pred = self.low_pass(y, window=self.window_size, cutoff=1 / self.cutoff)
if self.fill:
y_pred = self.fill_borders(y_pred, method='spline') # Extrapolate start/end points
RES['low_freq'] = y_pred
if self.windowIA_size != np.Inf:
# 4th: Interannual
y = RES['observed'] - RES['seasonal'] - RES['linear_trend'] - RES['low_freq']
y_pred = self.low_pass(y, window=self.windowIA_size, cutoff=1 / self.cutoffIA)
if self.fill:
y_pred = self.fill_borders(y_pred, method='spline') # Extrapolate start/end points
RES['interannual'] = y_pred
# Residual:
if self.windowIA_size != np.Inf:
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values \
- RES['low_freq'].values - RES['interannual'].values
else:
y = RES['observed'].values - RES['seasonal'].values - RES['linear_trend'].values \
- RES['low_freq'].values
y_pred = y # Residual !
RES['residual'] = xr.DataArray(y_pred, dims='time')
return RES
class VariableExplorer_2comp(param.Parameterized):
preprocessing_rolling_mean_width = param.Integer(default=3, bounds=(0, 6), doc="Pre-processing")
low_frequency_window_size = param.Integer(default=4 * 12, bounds=(1, 12 * 5))
interannual_window_size = param.Integer(default=1 * 12, bounds=(1, 12 * 5))
variable = param.ObjectSelector(default="", objects=list())
filter_type = param.ObjectSelector(default='Butterworth', objects=['Moving Average', 'Hanning', 'Butterworth'])
def __init__(self, ds, freq='M', default="", windows=[3, 4 * 12, 1 * 12], width=850, height=200, ncols=1):
self.ds = ds
self.freq = freq
self.geo = (ncols,width,height)
self.param['preprocessing_rolling_mean_width'].default = windows[0]
self.param['low_frequency_window_size'].default = windows[1]
self.param['interannual_window_size'].default = windows[2]
Variable_list = list(ds.data_vars)
self.param['variable'].objects = Variable_list
if default == "":
self.param['variable'].default = Variable_list[0]
else:
self.param['variable'].default = Variable_list[Variable_list.index(default)]
@param.depends('variable', 'low_frequency_window_size', 'interannual_window_size', 'filter_type',
'preprocessing_rolling_mean_width')
def load_variable(self):
da = self.ds[self.variable].copy()
da.values = da.values - np.mean(da.values)
# if 'volume' in self.variable:
# da.values = da.values / svy
# da.attrs['units'] = 'Svy'
if 'units' not in da.attrs:
da.attrs['units'] = '[?]'
if 'long_name' not in da.attrs:
da.attrs['long_name'] = self.variable
# Pre-processing:
if self.preprocessing_rolling_mean_width > 0:
da.values = (da.rolling(time=self.preprocessing_rolling_mean_width, center=True).mean()).values
# Decomposition:
RES = tsia_decomposer(filter_type=self.filter_type,
window_size=self.low_frequency_window_size,
windowIA_size=self.interannual_window_size,
freq=self.freq,
fill_borders=False).fit_predict(da)
# Now compose a Plot:
# y_obs = {'Time': da['time'].values, da.units: RES['observed'].values}
y_obs = {'Time': da['time'].values, da.units: RES['observed'].values - RES['seasonal'].values}
y_seas = {'Time': da['time'].values, da.units: RES['seasonal'].values}
y_ltrn = {'Time': da['time'].values, da.units: RES['linear_trend'].values}
# y_lowf = {'Time': da['time'].values, da.units: RES['low_freq'].values}
y_lowf = {'Time': da['time'].values, da.units: RES['linear_trend'].values + RES['low_freq'].values}
y_inta = {'Time': da['time'].values, da.units: RES['interannual'].values}
y_resd = {'Time': da['time'].values, da.units: RES['residual'].values}
ydim = hv.Dimension(da.units, range=vrange(y_obs[da.units]))
cv_obs = hv.Curve(y_obs, 'Time', ydim, label='Observation - Seas. Cyc.', group=da.long_name).opts(color='gray',
framewise=True)
# cv_ltrn = hv.Curve(y_ltrn, 'Time', ydim, label='Linear Trend', group=da.long_name).opts(color='orange', framewise=True)
cv_lowf = hv.Curve(y_lowf, 'Time', ydim, label='Linear Trend + Low Freq.', group=da.long_name).opts(color='red',
framewise=True)
cv_inta = hv.Curve(y_inta, 'Time', ydim, label='Interannual', group=da.long_name).opts(color='blue',
framewise=True)
# ydim = hv.Dimension(da.units, range=vrange(y_lowf[da.units]))
ydim = hv.Dimension(da.units, range=vrange(y_seas[da.units]))
cv_ltrn = hv.Curve(y_ltrn, 'Time', ydim, label='Linear Trend', group=da.long_name).opts(color='orange',
framewise=True)
cv_seas = hv.Curve(y_seas, 'Time', ydim, label='Seasonal', group=da.long_name).opts(color='blue',
framewise=True)
cv_resd = hv.Curve(y_resd, 'Time', ydim, label='Residual', group=da.long_name).opts(color='gray',
framewise=True)
w,h = self.geo[1], self.geo[2]
# layout = (cv_obs * cv_inta * cv_lowf.opts(show_grid=True, framewise=True, width=w)
# + cv_resd * cv_seas)
# layout.cols(2)
# layout = (cv_obs * cv_ltrn * cv_seas.opts(show_grid=True, framewise=True, width=w)
# + cv_inta * cv_lowf.opts(show_grid=True, framewise=True, width=w)
# + cv_resd.opts(show_grid=True, framewise=True, width=w))
# layout.cols(1)
# layout = (cv_obs
# * cv_lowf * cv_inta.opts(show_grid=True, framewise=True, width=w)
# + cv_ltrn * cv_seas * cv_resd.opts(show_grid=True, framewise=True, width=w))
layout = hv.Layout((
hv.Overlay( (cv_obs, cv_lowf, cv_inta) ).opts(show_grid=True, framewise=True, width=w)\
.opts(toolbar='above', legend_position='top'),\
hv.Overlay( (cv_ltrn, cv_seas, cv_resd) ).opts(show_grid=True, framewise=True, width=w)\
.opts(toolbar='above', legend_position='bottom')
))
layout.cols(self.geo[0])
return layout
class VariableExplorer_1comp(param.Parameterized):
preprocessing_rolling_mean_width = param.Integer(default=3, bounds=(0, 6), doc="Pre-processing")
low_frequency_window_size = param.Integer(default=1 * 12, bounds=(1, 12 * 10))
variable = param.ObjectSelector(default="", objects=list())
filter_type = param.ObjectSelector(default='Butterworth', objects=['Moving Average', 'Hanning', 'Butterworth'])
def __init__(self, ds, freq='M', default="", windows=[3, 1 * 12], width=850, height=200, ncols=1):
self.ds = ds
self.freq = freq
self.geo = (ncols,width,height)
self.param['preprocessing_rolling_mean_width'].default = windows[0]
self.param['low_frequency_window_size'].default = windows[1]
Variable_list = list(ds.data_vars)
self.param['variable'].objects = Variable_list
if default == "":
self.param['variable'].default = Variable_list[0]
else:
self.param['variable'].default = Variable_list[Variable_list.index(default)]
@param.depends('variable', 'low_frequency_window_size', 'filter_type',
'preprocessing_rolling_mean_width')
def load_variable(self):
da = self.ds[self.variable].copy()
da.values = da.values - np.mean(da.values)
if 'units' not in da.attrs:
da.attrs['units'] = '[?]'
if 'long_name' not in da.attrs:
da.attrs['long_name'] = self.variable
# Pre-processing:
if self.preprocessing_rolling_mean_width > 0:
da.values = (da.rolling(time=self.preprocessing_rolling_mean_width, center=True).mean()).values
# Decomposition:
RES = tsia_decomposer(filter_type=self.filter_type,
window_size=self.low_frequency_window_size,
windowIA_size=np.Inf,
freq=self.freq,
fill_borders=False).fit_predict(da)
# Now compose a Plot:
# y_obs = {'Time': da['time'].values, da.units: RES['observed'].values}
y_obs = {'Time': da['time'].values, da.units: RES['observed'].values - RES['seasonal'].values}
y_seas = {'Time': da['time'].values, da.units: RES['seasonal'].values}
y_ltrn = {'Time': da['time'].values, da.units: RES['linear_trend'].values}
# y_lowf = {'Time': da['time'].values, da.units: RES['low_freq'].values}
y_lowf = {'Time': da['time'].values, da.units: RES['linear_trend'].values + RES['low_freq'].values}
y_resd = {'Time': da['time'].values, da.units: RES['residual'].values}
ydim = hv.Dimension(da.units, range=vrange(y_obs[da.units]))
cv_obs = hv.Curve(y_obs, 'Time', ydim, label='Observation - Seas. Cyc.', group=da.long_name).opts(color='gray',
framewise=True)
# cv_ltrn = hv.Curve(y_ltrn, 'Time', ydim, label='Linear Trend', group=da.long_name).opts(color='orange', framewise=True)
cv_lowf = hv.Curve(y_lowf, 'Time', ydim, label='Linear Trend + Low Freq.', group=da.long_name).opts(color='red',
framewise=True)
ydim = hv.Dimension(da.units, range=vrange(y_seas[da.units]))
cv_ltrn = hv.Curve(y_ltrn, 'Time', ydim, label='Linear Trend', group=da.long_name).opts(color='orange',
framewise=True)
cv_seas = hv.Curve(y_seas, 'Time', ydim, label='Seasonal', group=da.long_name).opts(color='blue',
framewise=True)
cv_resd = hv.Curve(y_resd, 'Time', ydim, label='Residual', group=da.long_name).opts(color='gray',
framewise=True)
w,h = self.geo[1], self.geo[2]
# layout = (cv_obs * cv_inta * cv_lowf.opts(show_grid=True, framewise=True, width=w)
# + cv_resd * cv_seas)
# layout.cols(2)
# layout = (cv_obs * cv_ltrn * cv_seas.opts(show_grid=True, framewise=True, width=w)
# + cv_inta * cv_lowf.opts(show_grid=True, framewise=True, width=w)
# + cv_resd.opts(show_grid=True, framewise=True, width=w))
# layout.cols(1)
layout = hv.Layout((
hv.Overlay( (cv_obs,cv_lowf) ).opts(show_grid=True, framewise=True, width=w)\
.opts(toolbar='above', legend_position='top'),\
hv.Overlay( (cv_seas,cv_resd) ).opts(show_grid=True, framewise=True, width=w)\
.opts(toolbar='above', legend_position='bottom')
))
layout.cols(self.geo[0])
return layout
def dashboard(*args, **kwargs):
"""Open an interactive dashboard to visualise time decomposition of xr.DataSet time series
Examples:
dashboard(ds, default='ohc', windows=[6,48,8])
dashboard(ds, freq='M', default='MW_volume', windows=[3,48,12], ncols=1, width=1200)
"""
if 'windows' in kwargs:
windows = kwargs.get('windows')
if len(windows)==2:
explorer = VariableExplorer_1comp(*args, **kwargs)
elif len(windows)==3:
explorer = VariableExplorer_2comp(*args, **kwargs)
var_dmap = hv.DynamicMap(explorer.load_variable)
panel = pn.Row(explorer.param, var_dmap)
return panel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment