Skip to content

Instantly share code, notes, and snippets.

@Seleucia
Last active May 23, 2022 14:10
Show Gist options
  • Save Seleucia/3a4f3fadc6a8dc215b4b6fd3d6c0b596 to your computer and use it in GitHub Desktop.
Save Seleucia/3a4f3fadc6a8dc215b4b6fd3d6c0b596 to your computer and use it in GitHub Desktop.
Recurrent Network Models for Human Dynamics
import tensorflow as tf
from tensorflow.python.ops import clip_ops
from tensorflow.python.ops import logging_ops
#I modifid the tensorflow lstm implementation that returns the internal values also.
import model_runner.common.rnn_cellv3 as rnncell
class Model():
def __init__(self,params, is_training=True):
self.is_training = tf.placeholder(tf.bool)
self.is_forcasting = tf.placeholder(tf.bool)
self.output_keep_prob = tf.placeholder(tf.float32)
self.std_noise = tf.placeholder(tf.float32)
inp_sequence_length = params['inp_sequence_length']
out_sequence_length = params['out_sequence_length']
num_layers=params['nlayer']
rnn_size=params['n_hidden']
grad_clip=params['grad_clip']
cell_lst=[]
#We can apply dropout between layers. Keep it in loop format.
for i in range(num_layers):
cell = rnncell.ModifiedLSTMCell(rnn_size, forget_bias=1,initializer= tf.contrib.layers.xavier_initializer(),num_proj=None,is_training=self.is_training)
cell_lst.append(cell)
cell = rnncell.MultiRNNCell(cell_lst)
self.cell = cell
NOUT = params['n_output']
self.input_data = tf.placeholder(dtype=tf.float32, shape=[params["batch_size"],inp_sequence_length, params['n_input']])
self.target_data =tf.placeholder(dtype=tf.float32, shape=[params["batch_size"],out_sequence_length,params["n_output"]])
self.initial_state = cell.zero_state(batch_size=params["batch_size"], dtype=tf.float32)
input_shape=self.input_data.get_shape()
output_shape=self.target_data.get_shape()
#Noise applied only training phase and if only std bigger than 0. Increase noise level over training updates.
if(params["noise_std"]>0.0):
ran_noise = tf.random_normal(shape=input_shape, mean=0, stddev=self.std_noise)
tmp_input=self.input_data+ran_noise
self.input_data=tf.select(self.is_training,tmp_input,self.input_data)
with tf.variable_scope('rnnlm'):
output_pre_w1 = tf.get_variable("output_pre_w1", [params['n_input'], 500],initializer=tf.contrib.layers.xavier_initializer() )
output_pre_b1 = tf.get_variable("output_pre_b1", [500])
output_pre_w2 = tf.get_variable("output_pre_w2", [500, 500],initializer=tf.contrib.layers.xavier_initializer() )
output_pre_b2 = tf.get_variable("output_pre_b2", [500])
output_w1 = tf.get_variable("output_w1", [rnn_size, 500],initializer=tf.contrib.layers.xavier_initializer() )
output_b1 = tf.get_variable("output_b1", [500])
output_w2 = tf.get_variable("output_w2", [500, 100],initializer=tf.contrib.layers.xavier_initializer() )
output_b2 = tf.get_variable("output_b2", [100])
output_w3 = tf.get_variable("output_w3", [100, NOUT],initializer=tf.contrib.layers.xavier_initializer() )
output_b3 = tf.get_variable("output_b3", [NOUT])
outputs = []
state = self.initial_state
seq_ls_internal=[]
# cell_output=0
#During testing, we are just taking seed values and rest of sequence generated by just feeding output to again input.
with tf.variable_scope("rnnlm"):
for time_step in range(out_sequence_length):
if time_step > 0: tf.get_variable_scope().reuse_variables()
else:cell_output=self.input_data[:,time_step,:]
#Check if we are still in seeding time steps.
inp= tf.cond(tf.greater_equal(time_step,inp_sequence_length),
lambda: cell_output,
lambda:self.input_data[:,time_step,:])
inp=tf.reshape(inp,[-1,params["n_input"]]) #[batch_size, inputdim]
inp=tf.nn.relu(tf.add(tf.matmul(inp, output_pre_w1),output_pre_b1))
inp=tf.add(tf.matmul(inp, output_pre_w2),output_pre_b2)
# inp=tf.reshape(inp,[input_shape[0], 512])
(cell_output, state,ls_internals) = cell(inp, state) #apply recurrent
cell_output = tf.nn.relu(tf.add(tf.matmul(cell_output, output_w1),output_b1))
cell_output = tf.nn.relu(tf.add(tf.matmul(cell_output, output_w2),output_b2))
cell_output = tf.add(tf.matmul(cell_output, output_w3),output_b3)
seq_ls_internal.append(ls_internals)
outputs.append(cell_output)
rnn_output = tf.reshape(tf.transpose(tf.pack(outputs),[1,0,2]), [-1, NOUT])
self.seq_ls_internal=seq_ls_internal
self.y=tf.reshape(self.target_data,[-1,params["n_output"]])
self.final_output=rnn_output
index=0
tmp = self.final_output - self.y
loss= tf.nn.l2_loss(tmp)
self.tvars = tf.trainable_variables()
l2_reg=tf.reduce_sum([tf.nn.l2_loss(var) for var in self.tvars])
l2_reg=tf.mul(l2_reg,1e-4)
self.cost = tf.reduce_mean(loss)+l2_reg
self.final_state = state
tf.scalar_summary('losses/total_loss', loss)
self.lr = tf.Variable(0.0, trainable=False)
total_parameters = 0
for variable in self.tvars:
# shape is an array of tf.Dimension
shape = variable.get_shape()
variable_parametes = 1
for dim in shape:
variable_parametes *= dim.value
total_parameters += variable_parametes
self.total_parameters=total_parameters
grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, self.tvars), grad_clip)
for grad in grads:
grad_values = grad
logging_ops.histogram_summary(grad.op.name + ':gradient', grad_values)
logging_ops.histogram_summary(grad.op.name + ':gradient_norm', clip_ops.global_norm([grad_values]))
optimizer = tf.train.AdamOptimizer(self.lr)
self.train_op = optimizer.apply_gradients(zip(grads, self.tvars))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment