Skip to content

Instantly share code, notes, and snippets.

@mzdravkov
Created December 16, 2023 20:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mzdravkov/ddf88e44cf5808496dcfc7f22273f208 to your computer and use it in GitHub Desktop.
Save mzdravkov/ddf88e44cf5808496dcfc7f22273f208 to your computer and use it in GitHub Desktop.
Time series forecasting with fourier transformations
def build_sequences(time_series, valid_periods, categories, train_size, test_size):
"""
Creates all possible test sequences with size <test_size> which have
a training sequence of <train_size> in front.
"""
X = []
y = []
final_categories = []
for ts, range, category in zip(time_series, valid_periods, categories):
valid_ts = cut_valid(ts, range)
size = len(valid_ts)
splits = (size - train_size) // test_size
if splits < 2:
if size < train_size + test_size:
train_ts = valid_ts[0:-test_size]
train_ts = interpolate_to_size(train_ts, train_size)
X.append(train_ts)
# padding_len = train_size + test_size - size
# padding = np.zeros(padding_len, dtype='float32')
# valid_ts = np.concatenate((padding, valid_ts))
# start = 0
else:
start = size - train_size - test_size
X.append(valid_ts[start:-test_size])
y.append(valid_ts[-test_size:])
final_categories.append(category)
else:
X.append(interpolate_to_size(valid_ts[0:-test_size], train_size))
y.append(valid_ts[-test_size:])
final_categories.append(category)
flipped = (-1*valid_ts) + np.max(valid_ts)
X.append(interpolate_to_size(flipped[0:-test_size], train_size))
y.append(flipped[-test_size:])
final_categories.append(category)
ts_splitter = TimeSeriesSplit(n_splits=splits, max_train_size=train_size, test_size=test_size)
for train_seq_ix, test_seq_ix in ts_splitter.split(valid_ts):
X.append(valid_ts[train_seq_ix[0]:train_seq_ix[-1]+1])
y.append(valid_ts[test_seq_ix[0]:test_seq_ix[-1]+1])
final_categories.append(category)
X.append(valid_ts[::-1][train_seq_ix[0]:train_seq_ix[-1]+1])
y.append(valid_ts[::-1][test_seq_ix[0]:test_seq_ix[-1]+1])
final_categories.append(category)
return np.array(X), np.array(y), np.array(final_categories)
X_train_val, y_train_val, cat_train_val = build_sequences(time_series_train, valid_periods_train, categories_train, 100, 9)
X_test, y_test, cat_test = build_sequences(time_series_test, valid_periods_test, categories_test, 100, 9)
def create_fft_model(categories, input_size, output_size):
input_size = 2*(input_size - 1)
input_layer = keras.layers.Input(shape=(input_size + categories, 1), name='input_layer')
category_encoding = keras.layers.Cropping1D((0, input_size), name='category_cropping')(input_layer)
category_encoding = keras.layers.Flatten(name='category_input_flattened')(category_encoding)
ts_window = keras.layers.Cropping1D((categories, 0), name='time_series_input')(input_layer)
# x = keras.layers.Dropout(0.2)(ts_window)
# x = keras.layers.Conv1D(64, 3, padding='valid', activation='relu', name='conv1')(x)
# x = keras.layers.Conv1D(128, 3, strides=2, padding='valid', activation='relu', name='conv2')(x)
x = keras.layers.Bidirectional(keras.layers.GRU(128,
activation='tanh',
return_sequences=True,
name='gru1'), name='bidirectional_lstm1')(ts_window)
# x = keras.layers.Conv1D(64, 3, padding='valid', activation='relu', name='conv1')(x)
# x = keras.layers.Dropout(0.2)(x)
# x = keras.layers.Conv1D(128, 3, strides=2, padding='valid', activation='relu', name='conv2')(x)
# x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Bidirectional(keras.layers.GRU(128,
activation='tanh',
return_sequences=True,
name='gru2'), name='bidirectional_lstm2')(x)
# x = keras.layers.Concatenate(name='concat_ts_category')([x, category_encoding])
x = keras.layers.Conv1D(64, 3, padding='valid', activation='relu', name='conv3')(x)
# x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Conv1D(128, 3, strides=2, padding='valid', activation='relu', name='conv4')(x)
# x = keras.layers.Dropout(0.2)(x)
x = keras.layers.Flatten()(x)
output_layer = keras.layers.Dense(output_size, activation='relu', name='output_layer')(x)
model = tf.keras.Model(inputs=input_layer, outputs=output_layer, name='conv_lstm')
model.compile(loss=tf.keras.losses.MeanSquaredError(), optimizer=tf.keras.optimizers.Adam())
return model
fft_model = create_fft_model(6, 100, 9)
fft_model.summary()
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder()
cat_train_val_one_hot = enc.fit_transform(cat_train_val.reshape(-1, 1))
cat_test_one_hot = enc.fit_transform(cat_test.reshape(-1, 1))
history = fft_model.fit(
x=np.concatenate([cat_train_val_one_hot.todense(), np.apply_along_axis(hfft, 1, X_train_val)], axis=1),
y=y_train_val,
# class_weight={0: zero_weight, 1: one_weight},
batch_size=128,
epochs=100,
shuffle=True,
validation_data=(np.concatenate([cat_test_one_hot.todense(), np.apply_along_axis(hfft, 1, X_test)], axis=1),
y_test),
callbacks=[keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', patience=11, restore_best_weights=False),
keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, mode='auto'),
tf.keras.callbacks.ModelCheckpoint(filepath='/content/drive/MyDrive/AN2DL_time_series_data/hfft_model',
monitor='val_loss',
mode='min',
save_best_only=True)]
).history
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment