Created
January 18, 2012 06:54
-
-
Save danhammer/1631591 to your computer and use it in GitHub Desktop.
trends
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, sys, json | |
import numpy as np | |
try: | |
#import matplotlib.pylab as plt | |
from scikits.timeseries.lib.interpolate import * | |
import scikits.statsmodels.sandbox.stats.diagnostic as diag | |
from scikits.statsmodels.tsa.filters import hpfilter | |
import scikits.statsmodels.api as sm | |
except ImportError: | |
print "Problems importing scikits. Hope that's ok" | |
from forma import common | |
from forma.tools import date_utils, file_utils, transfer, misc, status | |
from forma.jobs.preprocess.dynamic.modis import timeseries | |
from forma.jobs.preprocess import static | |
from forma.aws import sqs | |
LAYERS = ["ndvi", "quality", "reliability", "vcf"] | |
MAX_MEAN_RELIABILITY = 2 | |
# TODO: write tests to check certain parameters of time-series, before | |
# and after input into the function. | |
# TODO: if there are any iterations, looping through the time-series, | |
# write tests to ensure that there aren't any "hanging chads" | |
# i.e. that there are precisely the correct number of panel units and | |
# time periods in the data | |
def make_reliable(target_ts, reli_ts=None): | |
""" | |
Clean up unreliable spectral values by linear interpolation. | |
""" | |
# Mask out completely unreliable (reli_ts == 3) and/or marginally | |
# unreliable data (reli_ts == 1). | |
if type(reli_ts) == 'numpy.ndarray': | |
mask = (reli_ts == 3)# | (reli_ts == 1) | |
target_masked = np.ma.array(target_ts, mask=mask) | |
else: | |
mask = np.ma.make_mask(len(target_ts)) | |
target_masked = np.ma.array(target_ts, mask=~mask) | |
# interpolate internal values (default: linear interpolation), | |
# then fill the ends with respectively the last or first available | |
# observation | |
target_cleaned = interp_masked1d(target_masked) | |
target_cleaned = forward_fill(target_cleaned) | |
target_cleaned = backward_fill(target_cleaned) | |
return target_cleaned | |
def seasonal_decomposition(time_series): | |
""" | |
Seasonal adjustment of an additive time-series (`Y`) by first | |
removing the Trend (`T`) and Cyclical (`C`) components, and then | |
averaging over each time period to get the Seasonal (`S`) | |
component. Note that | |
Y = T + C + S + e, | |
where `e` is the idiosyncratic error. The function returns the | |
deseasonalized time-series, or `Y - S = T + C + e`. This function | |
is very similar to the `stl` command as part of R's `stats` | |
package. | |
Currently the function is written only for 16-day periods, of | |
which there are 23 in a given year. | |
TODO (dan): extend for other frequencies | |
Args: | |
----------- | |
time_series : time-series with 16-day frequency | |
Returns: | |
----------- | |
dts : deseasonalized time-series | |
References: | |
----------- | |
www.dst.dk/upload/seasonal.pdf | |
""" | |
# Number of separate periods per year (e.g., quarterly data would | |
# yield seasons = 4) | |
seasons = 23 | |
pds = len(time_series) | |
# Mask out end values for which there aren't enough values on | |
# either side for the full moving average. For a frequency of 23 | |
# seasons per year, the buffer length is 11 on each side of a | |
# given observation. | |
m = np.ma.make_mask([1]*11 + [0]*(pds - seasons) + [1]*12) | |
ts_mask = np.ma.array(time_series, mask=m) | |
# Define the window length for the moving average. If the first | |
# observation is the first season in a year, then the last | |
# observation in the window will be the first season in the | |
# next year, i.e., the window must be one period longer than the | |
# number of seasons. | |
window_len = seasons + 1 | |
weightings = [.5] + [1]*(window_len - 2) + [.5] | |
weightings = np.array(weightings)/seasons | |
smoothed = np.convolve(time_series, weightings)[window_len-1:-(window_len-1)] | |
# Populate a masked array of the same dimension and mask as the | |
# original time-series with the difference between the original | |
# values and the smoothed moving average | |
difference = ts_mask[~ts_mask.mask].data - smoothed | |
diff_ts = np.ma.array(np.empty(pds), mask=m) | |
diff_ts[~diff_ts.mask] = difference | |
# Create an array of the proper shape, so that it can be reshaped | |
# into a rectangle of dimension years by frequency (periods per | |
# year) into the `periodic` array | |
years = np.ceil(pds/float(seasons)) | |
buf_pds = years*seasons - pds | |
filler_mask = np.ma.array(np.empty(buf_pds), mask=np.repeat(True, buf_pds)) | |
reshape_ts = np.ma.concatenate([diff_ts, filler_mask]) | |
periodic = reshape_ts.reshape(years,seasons) | |
# Create an array of the average values within a given season | |
seas_avg = np.ma.mean(periodic, axis=0) | |
repeat_seas = np.repeat([seas_avg], years, axis=0) | |
# Flatten the seasonal components into an array that can be used | |
# to difference the original time series | |
total_seas = [y for x in repeat_seas for y in x] | |
dts = time_series - total_seas[:len(time_series)] | |
return dts | |
def results_lm(time_series, cofactor_ts=None): | |
""" | |
Create an object of RegressionResults. | |
Args: | |
----------- | |
time_series : exogenous time-series (e.g., NDVI) | |
cofactor_ts : endogenout cofactor (e.g., rainfall) | |
Returns: | |
----------- | |
results : RegressionResults class | |
""" | |
time_step = np.array(range(len(time_series))) | |
if cofactor_ts == None: | |
X = sm.add_constant(time_step, prepend=False) | |
else: | |
cx = np.vstack([time_step, cofactor_ts]).T | |
X = sm.add_constant(cx) | |
results = sm.OLS(time_series, X).fit() | |
return results | |
def hansen_stat(linear_results): | |
""" | |
Caclulate the hansen break statistic. | |
Args: | |
----------- | |
linear_results : RegressionResults class for a linear model | |
References: | |
----------- | |
B. E. Hansen. Testing for parameter instability in linear | |
""" | |
y = linear_results.model.endog | |
x = linear_results.model.exog | |
resid = linear_results.resid | |
nobs, nvars = x.shape | |
resid2 = resid**2 | |
ft = np.c_[x*resid[:,None], (resid2 - resid2.mean())] | |
s = ft.cumsum(0) | |
fval = nobs*(ft[:,:,None]*ft[:,None,:]).sum(0) | |
ssum = (s[:,:,None]*s[:,None,:]).sum(0) | |
hansen_stat = np.trace(np.dot(np.linalg.inv(fval), ssum)) | |
return hansen_stat | |
def whizbang(linear_results): | |
""" | |
Collect the long-term linear trend, its t-statistic, as well as | |
the lagrange multiplier test statistic for autocorrelation for a | |
given linear model. The LM test stat is only collected for | |
downward trending time-series. | |
Args: | |
----------- | |
linear_results : RegressionResults class for a linear model | |
Returns: | |
----------- | |
trend : linear trend | |
tval : student test statistic | |
acorr : Lagrange multiplier test statistic | |
References: | |
----------- | |
Breusch, T. S. and Pagan, A. R. (1980). The Lagrange multiplier | |
test and its applications to model specification in | |
econometrics. Review of Economic Studies, 47, 239–253 | |
""" | |
# TODO (dan): Check the default value for acorr | |
y = linear_results.model.endog | |
trend, tval = linear_results.params[0], linear_results.tvalues[0] | |
if tval < -1.96: | |
acorr = diag.acorr_lm(y)[0] | |
else: | |
acorr = -9999 | |
return trend, tval, acorr | |
def whoopbang(spectral_ts, block_len=30, window_len=10): | |
""" | |
Calculate the "steepest" short-term drop by first calculating the | |
piecewise linear trends and second smoothing over the trends. | |
Collect the minimum (most negative) value of the smoothed-over | |
trends. | |
""" | |
# Create cofactor matrix of time trend + ones vector (for intercept) | |
time_step = np.array(range(block_len)) + 1 | |
X = np.vstack([time_step, np.ones(len(time_step))]).T | |
# Create blocks to grab linear trends from each | |
Y = spectral_ts[0:block_len] | |
for i in xrange(1,(len(spectral_ts) - block_len + 1)): | |
Y = np.vstack([Y, spectral_ts[i:(i+block_len)]]) | |
trend_blocks = np.linalg.lstsq(X, Y.T)[0][0] | |
# Moving average to smooth short-term breaks | |
weightings = np.repeat(1.0, window_len) / window_len | |
smoothed = np.convolve(trend_blocks, weightings)[window_len-1:-(window_len-1)] | |
# Minimum value of smoothed values | |
return smoothed.min() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment