Skip to content

Instantly share code, notes, and snippets.

@Zwackelmann
Last active Aug 17, 2021
Embed
What would you like to do?
import numpy as np
from random import randint
import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, RepeatVector, TimeDistributed, Dense
from functools import lru_cache
import time
NUM_SERIES = 1000
TRAIN_SET_SIZE = 1000
NUM_COL = 28
WINDOW_LEN = 14
def main():
# idx_dataset/ index mode: memory efficient but slow
# plain_dataset/ plain mode: memory inefficient but fast
custom_fit(idx_dataset(), train_steps=2000, mode='idx')
custom_fit(plain_dataset(), train_steps=2000, mode='plain')
custom_fit(idx_dataset(), train_steps=2000, mode='idx')
custom_fit(plain_dataset(), train_steps=2000, mode='plain')
"""
Results on my machine (py 3.8.2, tf 2.5.0, cuda 11, gpu: Quadro T1000)
mode: idx: rate: 105 steps/s
mode: plain: rate: 176 steps/s
mode: idx: rate: 105 steps/s
mode: plain: rate: 183 steps/s
"""
@lru_cache
def series_ragged_tensor():
"""Creates a tf.RaggedTensor from the `data_series()`"""
return tf.RaggedTensor.from_row_lengths(tf.concat(data_series(), axis=0), [ser.shape[0] for ser in data_series()])
@lru_cache
def plain_dataset():
"""Generates a dataset of `DS_SIZE` random sub-sequences from the data series dataset"""
arr = np.zeros((TRAIN_SET_SIZE, WINDOW_LEN, NUM_COL))
for ridx in range(arr.shape[0]):
dataset_index = randint(0, NUM_SERIES - 1)
range_start = randint(0, data_series()[dataset_index].shape[0] - (NUM_COL + 1))
range_end = range_start + WINDOW_LEN
arr[ridx] = data_series()[dataset_index][range_start:range_end]
return tf.data.Dataset.from_tensor_slices(arr).batch(32).repeat()
@lru_cache
def idx_dataset():
"""Generates a dataset of `DS_SIZE` random sub-sequences from the data series dataset encoded as indexes.
Each subsequence is encoded as [data_sequence_index, sequence_start_row_index, sequence_size]
"""
arr = np.zeros((TRAIN_SET_SIZE, 3), dtype=np.int32)
for ridx in range(arr.shape[0]):
dataset_index = randint(0, len(data_series()) - 1)
range_start = randint(0, data_series()[dataset_index].shape[0] - (NUM_COL + 1))
arr[ridx] = np.array([dataset_index, range_start, WINDOW_LEN])
return tf.data.Dataset.from_tensor_slices(arr).batch(32).repeat()
@lru_cache
def data_series():
"""Generates a list of data series with `NUM_SERIES` items with 1000-1200 rows and `NUM_COL` columns each"""
series_list = []
for idx in range(NUM_SERIES):
series_len = 1000 + randint(0, 200)
series = ((np.arange((series_len * NUM_COL)) / (series_len * NUM_COL)) * 2) - 1
series = series.reshape((series_len, NUM_COL))
series_list.append(series.astype(np.float32))
return series_list
def custom_fit(train_dataset, train_steps=100, mode='plain'):
opt = tf.keras.optimizers.Adam(0.001)
loss_fun = tf.losses.MeanSquaredError(name='loss')
rt = series_ragged_tensor()
model = autoenc_model()
@tf.function
def resolve_index_batch(idx_list):
return tf.map_fn(fn=lambda idx: tf.slice(rt[idx[0]], [idx[1], 0], [idx[2], -1]), elems=idx_list, fn_output_signature=tf.float32)
@tf.function
def train_step(batch):
if mode == 'idx':
# apply the conversion from indexes batch to data_series batch
batch = resolve_index_batch(batch)
# train on reversed time series
batch_flip = tf.reverse(batch, [1])
with tf.GradientTape() as tape:
m = model(batch, training=True)
loss = loss_fun(batch_flip, m)
grad = tape.gradient(loss, model.trainable_weights)
opt.apply_gradients(zip(grad, model.trainable_weights))
return loss
t = []
for step, batch_train in zip(range(train_steps), iter(train_dataset)):
step_loss = train_step(batch_train)
if step % 10 == 0:
print(f"step: {step}, loss: {step_loss:8.6f}", end='\r')
t.append(time.time())
# determine median calculation time for one training step
t_diff = [t2-t1 for t1, t2 in zip(t, t[1:])]
t_median = np.median(t_diff) / 10.0
# print median training rate
rate = int(1.0 / t_median)
print(f"\nmode: {mode}: rate: {rate} steps/s")
@lru_cache
def autoenc_model():
enc_param = {"u_lstm": [50, 10]}
seq_in = Input(shape=(WINDOW_LEN, NUM_COL))
seq_code = enc_comp(enc_param)(seq_in)
seq_out_flip = dec_comp(enc_param)(seq_code)
return tf.keras.Model([seq_in], [seq_out_flip])
def enc_comp(param):
seq_in = Input(shape=(WINDOW_LEN, NUM_COL), name="seq_in")
m = seq_in
for units in param['u_lstm'][:-1]:
m = LSTM(units, activation='tanh', recurrent_activation='sigmoid', return_sequences=True)(seq_in)
m = LSTM(param['u_lstm'][-1], activation='tanh', recurrent_activation='sigmoid', return_sequences=False)(m)
return tf.keras.Model(inputs=[seq_in], outputs=[m], name='encoder')
def dec_comp(param):
u_lstm = list(reversed(param['u_lstm']))
decoder_input = Input(shape=param['u_lstm'][-1], name="decoder_input")
m = decoder_input
m = RepeatVector(WINDOW_LEN)(m)
for units in u_lstm:
m = LSTM(units, activation='tanh', recurrent_activation='sigmoid', return_sequences=True)(m)
m = TimeDistributed(Dense(NUM_COL))(m)
return tf.keras.Model(inputs=[decoder_input], outputs=[m], name='decoder')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment