Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Extended Time Series Split
import numpy as np
from sklearn.model_selection._split import _BaseKFold
from sklearn.utils import indexable
from sklearn.utils.validation import _num_samples
LOGGER = logging.getLogger(__name__)
class TimeSeriesSplit(_BaseKFold): # pylint: disable=abstract-method
"""Time Series cross-validator
Provides train/test indices to split time series data samples that are observed at fixed time intervals,
in train/test sets. In each split, test indices must be higher than before, and thus shuffling in cross validator is
inappropriate.
This cross_validation object is a variation of :class:`TimeSeriesSplit` from the popular scikit-learn package.
It extends its base functionality to allow for expanding windows, and rolling windows with configurable train and
test sizes and delays between each. i.e. train on weeks 1-8, skip week 9, predict week 10-11.
In this implementation we specifically force the test size to be equal across all splits.
Expanding Window:
Idx / Time 0..............................................n
1 | train | delay | test | |
2 | train | delay | test | |
... | |
last | train | delay | test |
Rolling Windows:
Idx / Time 0..............................................n
1 | train | delay | test | |
2 | step | train | delay | test | |
... | |
last | step | ... | step | train | delay | test |
Parameters:
n_splits : int, default=5
Number of splits. Must be at least 4.
train_size : int, optional
Size for a single training set.
test_size : int, optional, must be positive
Size of a single testing set
delay : int, default=0, must be positive
Number of index shifts to make between train and test sets
e.g,
delay=0
TRAIN: [0 1 2 3] TEST: [4]
delay=1
TRAIN: [0 1 2 3] TEST: [5]
delay=2
TRAIN: [0 1 2 3] TEST: [6]
force_step_size : int, optional
Ignore split logic and force the training data to shift by the step size forward for n_splits
e.g
TRAIN: [ 0 1 2 3] TEST: [4]
TRAIN: [ 0 1 2 3 4] TEST: [5]
TRAIN: [ 0 1 2 3 4 5] TEST: [6]
TRAIN: [ 0 1 2 3 4 5 6] TEST: [7]
Examples
--------
>>> X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]])
>>> y = np.array([1, 2, 3, 4, 5, 6])
>>> tscv = TimeSeriesSplit(n_splits=5)
>>> print(tscv) # doctest: +NORMALIZE_WHITESPACE
TimeSeriesSplit(train_size=None, n_splits=5)
>>> for train_index, test_index in tscv.split(X):
... print('TRAIN:', train_index, 'TEST:', test_index)
... X_train, X_test = X[train_index], X[test_index]
... y_train, y_test = y[train_index], y[test_index]
TRAIN: [0] TEST: [1]
TRAIN: [0 1] TEST: [2]
TRAIN: [0 1 2] TEST: [3]
TRAIN: [0 1 2 3] TEST: [4]
TRAIN: [0 1 2 3 4] TEST: [5]
"""
def __init__(self,
n_splits: Optional[int] = 5,
train_size: Optional[int] = None,
test_size: Optional[int] = None,
delay: int = 0,
force_step_size: Optional[int] = None):
if n_splits and n_splits < 5:
raise ValueError(f'Cannot have n_splits less than 5 (n_splits={n_splits})')
super().__init__(n_splits, shuffle=False, random_state=None)
self.train_size = train_size
if test_size and test_size < 0:
raise ValueError(f'Cannot have negative values of test_size (test_size={test_size})')
self.test_size = test_size
if delay < 0:
raise ValueError(f'Cannot have negative values of delay (delay={delay})')
self.delay = delay
if force_step_size and force_step_size < 1:
raise ValueError(f'Cannot have zero or negative values of force_step_size '
f'(force_step_size={force_step_size}).')
self.force_step_size = force_step_size
def split(self, X, y=None, groups=None):
"""Generate indices to split data into training and test set.
Parameters:
X : array-like, shape (n_samples, n_features)
Training data, where n_samples is the number of samples and n_features is the number of features.
y : array-like, shape (n_samples,)
Always ignored, exists for compatibility.
groups : array-like, with shape (n_samples,), optional
Always ignored, exists for compatibility.
Yields:
train : ndarray
The training set indices for that split.
test : ndarray
The testing set indices for that split.
"""
X, y, groups = indexable(X, y, groups) # pylint: disable=unbalanced-tuple-unpacking
n_samples = _num_samples(X)
n_splits = self.n_splits
n_folds = n_splits + 1
delay = self.delay
if n_folds > n_samples:
raise ValueError(f'Cannot have number of folds={n_folds} greater than the number of samples: {n_samples}.')
indices = np.arange(n_samples)
split_size = n_samples // n_folds
train_size = self.train_size or split_size * self.n_splits
test_size = self.test_size or n_samples // n_folds
full_test = test_size + delay
if full_test + n_splits > n_samples:
raise ValueError(f'test_size\\({test_size}\\) + delay\\({delay}\\) = {test_size + delay} + '
f'n_splits={n_splits} \n'
f' greater than the number of samples: {n_samples}. Cannot create fold logic.')
# Generate logic for splits.
# Overwrite fold test_starts ranges if force_step_size is specified.
if self.force_step_size:
step_size = self.force_step_size
final_fold_start = n_samples - (train_size + full_test)
range_start = (final_fold_start % step_size) + train_size
test_starts = range(range_start, n_samples, step_size)
else:
if not self.train_size:
step_size = split_size
range_start = (split_size - full_test) + split_size + (n_samples % n_folds)
else:
step_size = (n_samples - (train_size + full_test)) // n_folds
final_fold_start = n_samples - (train_size + full_test)
range_start = (final_fold_start - (step_size * (n_splits - 1))) + train_size
test_starts = range(range_start, n_samples, step_size)
# Generate data splits.
for test_start in test_starts:
idx_start = test_start - train_size if self.train_size is not None else 0
# Ensure we always return a test set of the same size
if indices[test_start:test_start + full_test].size < full_test:
continue
yield (indices[idx_start:test_start],
indices[test_start + delay:test_start + full_test])
if __name__ == '__main__':
X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]])
y = np.array([1, 2, 3, 4, 5, 6])
tscv = TimeSeriesSplit(n_splits=5)
print(tscv) # doctest: +NORMALIZE_WHITESPACE
for train_index, test_index in tscv.split(X):
print('TRAIN:', train_index, 'TEST:', test_index)
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
print("---------------------------------------------")
LARGE_IDX = np.arange(0, 30)
rolling_window = TimeSeriesSplit(train_size=10, test_size=5, delay=3)
print(rolling_window)
for train_index, test_index in rolling_window.split(LARGE_IDX):
print('TRAIN:', train_index, 'TEST:', test_index)
X_train, X_test = LARGE_IDX[train_index], LARGE_IDX[test_index]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment