Last active
August 17, 2021 01:30
-
-
Save Zwackelmann/970cf3958134f2f537145bcbf520517f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from random import randint | |
import tensorflow as tf | |
from tensorflow.keras.layers import Input, LSTM, RepeatVector, TimeDistributed, Dense | |
from functools import lru_cache | |
import time | |
NUM_SERIES = 1000 | |
TRAIN_SET_SIZE = 1000 | |
NUM_COL = 28 | |
WINDOW_LEN = 14 | |
def main(): | |
# idx_dataset/ index mode: memory efficient but slow | |
# plain_dataset/ plain mode: memory inefficient but fast | |
custom_fit(idx_dataset(), train_steps=2000, mode='idx') | |
custom_fit(plain_dataset(), train_steps=2000, mode='plain') | |
custom_fit(idx_dataset(), train_steps=2000, mode='idx') | |
custom_fit(plain_dataset(), train_steps=2000, mode='plain') | |
""" | |
Results on my machine (py 3.8.2, tf 2.5.0, cuda 11, gpu: Quadro T1000) | |
mode: idx: rate: 105 steps/s | |
mode: plain: rate: 176 steps/s | |
mode: idx: rate: 105 steps/s | |
mode: plain: rate: 183 steps/s | |
""" | |
@lru_cache | |
def series_ragged_tensor(): | |
"""Creates a tf.RaggedTensor from the `data_series()`""" | |
return tf.RaggedTensor.from_row_lengths(tf.concat(data_series(), axis=0), [ser.shape[0] for ser in data_series()]) | |
@lru_cache | |
def plain_dataset(): | |
"""Generates a dataset of `DS_SIZE` random sub-sequences from the data series dataset""" | |
arr = np.zeros((TRAIN_SET_SIZE, WINDOW_LEN, NUM_COL)) | |
for ridx in range(arr.shape[0]): | |
dataset_index = randint(0, NUM_SERIES - 1) | |
range_start = randint(0, data_series()[dataset_index].shape[0] - (NUM_COL + 1)) | |
range_end = range_start + WINDOW_LEN | |
arr[ridx] = data_series()[dataset_index][range_start:range_end] | |
return tf.data.Dataset.from_tensor_slices(arr).batch(32).repeat() | |
@lru_cache | |
def idx_dataset(): | |
"""Generates a dataset of `DS_SIZE` random sub-sequences from the data series dataset encoded as indexes. | |
Each subsequence is encoded as [data_sequence_index, sequence_start_row_index, sequence_size] | |
""" | |
arr = np.zeros((TRAIN_SET_SIZE, 3), dtype=np.int32) | |
for ridx in range(arr.shape[0]): | |
dataset_index = randint(0, len(data_series()) - 1) | |
range_start = randint(0, data_series()[dataset_index].shape[0] - (NUM_COL + 1)) | |
arr[ridx] = np.array([dataset_index, range_start, WINDOW_LEN]) | |
return tf.data.Dataset.from_tensor_slices(arr).batch(32).repeat() | |
@lru_cache | |
def data_series(): | |
"""Generates a list of data series with `NUM_SERIES` items with 1000-1200 rows and `NUM_COL` columns each""" | |
series_list = [] | |
for idx in range(NUM_SERIES): | |
series_len = 1000 + randint(0, 200) | |
series = ((np.arange((series_len * NUM_COL)) / (series_len * NUM_COL)) * 2) - 1 | |
series = series.reshape((series_len, NUM_COL)) | |
series_list.append(series.astype(np.float32)) | |
return series_list | |
def custom_fit(train_dataset, train_steps=100, mode='plain'): | |
opt = tf.keras.optimizers.Adam(0.001) | |
loss_fun = tf.losses.MeanSquaredError(name='loss') | |
rt = series_ragged_tensor() | |
model = autoenc_model() | |
@tf.function | |
def resolve_index_batch(idx_list): | |
return tf.map_fn(fn=lambda idx: tf.slice(rt[idx[0]], [idx[1], 0], [idx[2], -1]), elems=idx_list, fn_output_signature=tf.float32) | |
@tf.function | |
def train_step(batch): | |
if mode == 'idx': | |
# apply the conversion from indexes batch to data_series batch | |
batch = resolve_index_batch(batch) | |
# train on reversed time series | |
batch_flip = tf.reverse(batch, [1]) | |
with tf.GradientTape() as tape: | |
m = model(batch, training=True) | |
loss = loss_fun(batch_flip, m) | |
grad = tape.gradient(loss, model.trainable_weights) | |
opt.apply_gradients(zip(grad, model.trainable_weights)) | |
return loss | |
t = [] | |
for step, batch_train in zip(range(train_steps), iter(train_dataset)): | |
step_loss = train_step(batch_train) | |
if step % 10 == 0: | |
print(f"step: {step}, loss: {step_loss:8.6f}", end='\r') | |
t.append(time.time()) | |
# determine median calculation time for one training step | |
t_diff = [t2-t1 for t1, t2 in zip(t, t[1:])] | |
t_median = np.median(t_diff) / 10.0 | |
# print median training rate | |
rate = int(1.0 / t_median) | |
print(f"\nmode: {mode}: rate: {rate} steps/s") | |
@lru_cache | |
def autoenc_model(): | |
enc_param = {"u_lstm": [50, 10]} | |
seq_in = Input(shape=(WINDOW_LEN, NUM_COL)) | |
seq_code = enc_comp(enc_param)(seq_in) | |
seq_out_flip = dec_comp(enc_param)(seq_code) | |
return tf.keras.Model([seq_in], [seq_out_flip]) | |
def enc_comp(param): | |
seq_in = Input(shape=(WINDOW_LEN, NUM_COL), name="seq_in") | |
m = seq_in | |
for units in param['u_lstm'][:-1]: | |
m = LSTM(units, activation='tanh', recurrent_activation='sigmoid', return_sequences=True)(seq_in) | |
m = LSTM(param['u_lstm'][-1], activation='tanh', recurrent_activation='sigmoid', return_sequences=False)(m) | |
return tf.keras.Model(inputs=[seq_in], outputs=[m], name='encoder') | |
def dec_comp(param): | |
u_lstm = list(reversed(param['u_lstm'])) | |
decoder_input = Input(shape=param['u_lstm'][-1], name="decoder_input") | |
m = decoder_input | |
m = RepeatVector(WINDOW_LEN)(m) | |
for units in u_lstm: | |
m = LSTM(units, activation='tanh', recurrent_activation='sigmoid', return_sequences=True)(m) | |
m = TimeDistributed(Dense(NUM_COL))(m) | |
return tf.keras.Model(inputs=[decoder_input], outputs=[m], name='decoder') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment