-
-
Save vannguyen3007/9380f1b845b1ad21cf99b15c117c5499 to your computer and use it in GitHub Desktop.
Extended Time Series Split
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from sklearn.model_selection._split import _BaseKFold | |
from sklearn.utils import indexable | |
from sklearn.utils.validation import _num_samples | |
LOGGER = logging.getLogger(__name__) | |
class TimeSeriesSplit(_BaseKFold): # pylint: disable=abstract-method | |
"""Time Series cross-validator | |
Provides train/test indices to split time series data samples that are observed at fixed time intervals, | |
in train/test sets. In each split, test indices must be higher than before, and thus shuffling in cross validator is | |
inappropriate. | |
This cross_validation object is a variation of :class:`TimeSeriesSplit` from the popular scikit-learn package. | |
It extends its base functionality to allow for expanding windows, and rolling windows with configurable train and | |
test sizes and delays between each. i.e. train on weeks 1-8, skip week 9, predict week 10-11. | |
In this implementation we specifically force the test size to be equal across all splits. | |
Expanding Window: | |
Idx / Time 0..............................................n | |
1 | train | delay | test | | | |
2 | train | delay | test | | | |
... | | | |
last | train | delay | test | | |
Rolling Windows: | |
Idx / Time 0..............................................n | |
1 | train | delay | test | | | |
2 | step | train | delay | test | | | |
... | | | |
last | step | ... | step | train | delay | test | | |
Parameters: | |
n_splits : int, default=5 | |
Number of splits. Must be at least 4. | |
train_size : int, optional | |
Size for a single training set. | |
test_size : int, optional, must be positive | |
Size of a single testing set | |
delay : int, default=0, must be positive | |
Number of index shifts to make between train and test sets | |
e.g, | |
delay=0 | |
TRAIN: [0 1 2 3] TEST: [4] | |
delay=1 | |
TRAIN: [0 1 2 3] TEST: [5] | |
delay=2 | |
TRAIN: [0 1 2 3] TEST: [6] | |
force_step_size : int, optional | |
Ignore split logic and force the training data to shift by the step size forward for n_splits | |
e.g | |
TRAIN: [ 0 1 2 3] TEST: [4] | |
TRAIN: [ 0 1 2 3 4] TEST: [5] | |
TRAIN: [ 0 1 2 3 4 5] TEST: [6] | |
TRAIN: [ 0 1 2 3 4 5 6] TEST: [7] | |
Examples | |
-------- | |
>>> X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]]) | |
>>> y = np.array([1, 2, 3, 4, 5, 6]) | |
>>> tscv = TimeSeriesSplit(n_splits=5) | |
>>> print(tscv) # doctest: +NORMALIZE_WHITESPACE | |
TimeSeriesSplit(train_size=None, n_splits=5) | |
>>> for train_index, test_index in tscv.split(X): | |
... print('TRAIN:', train_index, 'TEST:', test_index) | |
... X_train, X_test = X[train_index], X[test_index] | |
... y_train, y_test = y[train_index], y[test_index] | |
TRAIN: [0] TEST: [1] | |
TRAIN: [0 1] TEST: [2] | |
TRAIN: [0 1 2] TEST: [3] | |
TRAIN: [0 1 2 3] TEST: [4] | |
TRAIN: [0 1 2 3 4] TEST: [5] | |
""" | |
def __init__(self, | |
n_splits: Optional[int] = 5, | |
train_size: Optional[int] = None, | |
test_size: Optional[int] = None, | |
delay: int = 0, | |
force_step_size: Optional[int] = None): | |
if n_splits and n_splits < 5: | |
raise ValueError(f'Cannot have n_splits less than 5 (n_splits={n_splits})') | |
super().__init__(n_splits, shuffle=False, random_state=None) | |
self.train_size = train_size | |
if test_size and test_size < 0: | |
raise ValueError(f'Cannot have negative values of test_size (test_size={test_size})') | |
self.test_size = test_size | |
if delay < 0: | |
raise ValueError(f'Cannot have negative values of delay (delay={delay})') | |
self.delay = delay | |
if force_step_size and force_step_size < 1: | |
raise ValueError(f'Cannot have zero or negative values of force_step_size ' | |
f'(force_step_size={force_step_size}).') | |
self.force_step_size = force_step_size | |
def split(self, X, y=None, groups=None): | |
"""Generate indices to split data into training and test set. | |
Parameters: | |
X : array-like, shape (n_samples, n_features) | |
Training data, where n_samples is the number of samples and n_features is the number of features. | |
y : array-like, shape (n_samples,) | |
Always ignored, exists for compatibility. | |
groups : array-like, with shape (n_samples,), optional | |
Always ignored, exists for compatibility. | |
Yields: | |
train : ndarray | |
The training set indices for that split. | |
test : ndarray | |
The testing set indices for that split. | |
""" | |
X, y, groups = indexable(X, y, groups) # pylint: disable=unbalanced-tuple-unpacking | |
n_samples = _num_samples(X) | |
n_splits = self.n_splits | |
n_folds = n_splits + 1 | |
delay = self.delay | |
if n_folds > n_samples: | |
raise ValueError(f'Cannot have number of folds={n_folds} greater than the number of samples: {n_samples}.') | |
indices = np.arange(n_samples) | |
split_size = n_samples // n_folds | |
train_size = self.train_size or split_size * self.n_splits | |
test_size = self.test_size or n_samples // n_folds | |
full_test = test_size + delay | |
if full_test + n_splits > n_samples: | |
raise ValueError(f'test_size\\({test_size}\\) + delay\\({delay}\\) = {test_size + delay} + ' | |
f'n_splits={n_splits} \n' | |
f' greater than the number of samples: {n_samples}. Cannot create fold logic.') | |
# Generate logic for splits. | |
# Overwrite fold test_starts ranges if force_step_size is specified. | |
if self.force_step_size: | |
step_size = self.force_step_size | |
final_fold_start = n_samples - (train_size + full_test) | |
range_start = (final_fold_start % step_size) + train_size | |
test_starts = range(range_start, n_samples, step_size) | |
else: | |
if not self.train_size: | |
step_size = split_size | |
range_start = (split_size - full_test) + split_size + (n_samples % n_folds) | |
else: | |
step_size = (n_samples - (train_size + full_test)) // n_folds | |
final_fold_start = n_samples - (train_size + full_test) | |
range_start = (final_fold_start - (step_size * (n_splits - 1))) + train_size | |
test_starts = range(range_start, n_samples, step_size) | |
# Generate data splits. | |
for test_start in test_starts: | |
idx_start = test_start - train_size if self.train_size is not None else 0 | |
# Ensure we always return a test set of the same size | |
if indices[test_start:test_start + full_test].size < full_test: | |
continue | |
yield (indices[idx_start:test_start], | |
indices[test_start + delay:test_start + full_test]) | |
if __name__ == '__main__': | |
X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]]) | |
y = np.array([1, 2, 3, 4, 5, 6]) | |
tscv = TimeSeriesSplit(n_splits=5) | |
print(tscv) # doctest: +NORMALIZE_WHITESPACE | |
for train_index, test_index in tscv.split(X): | |
print('TRAIN:', train_index, 'TEST:', test_index) | |
X_train, X_test = X[train_index], X[test_index] | |
y_train, y_test = y[train_index], y[test_index] | |
print("---------------------------------------------") | |
LARGE_IDX = np.arange(0, 30) | |
rolling_window = TimeSeriesSplit(train_size=10, test_size=5, delay=3) | |
print(rolling_window) | |
for train_index, test_index in rolling_window.split(LARGE_IDX): | |
print('TRAIN:', train_index, 'TEST:', test_index) | |
X_train, X_test = LARGE_IDX[train_index], LARGE_IDX[test_index] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment