Skip to content

Instantly share code, notes, and snippets.

@saitejamalyala
Created August 11, 2021 09:40
Show Gist options
  • Save saitejamalyala/b3a7602c1369f7d6763651dee4c3ae91 to your computer and use it in GitHub Desktop.
Save saitejamalyala/b3a7602c1369f7d6763651dee4c3ae91 to your computer and use it in GitHub Desktop.
This gist provides an example of how to use another parmeter/information to calculate loss in keras model while using model.fit
from tensorflow.keras import backend as K
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import tensorflow as tf
from tensorflow.keras.regularizers import l1,l1_l2,l2
from tensorflow.python.keras.regularizers import L1
from rosbag2numpy.config import params
def euclidean_distance_loss(y_true, y_pred):
"""
Euclidean distance loss
https://en.wikipedia.org/wiki/Euclidean_distance
:param y_true: TensorFlow tensor
:param y_p red: TensorFlow tensor of the same shape as y_true
:return: float
"""
#original euclidean distance loss = K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1))
#loss = K.mean(K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1)))
loss = K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1))
return loss
def endpoint_loss(y_true, y_pred): # or final displacement error
loss = K.sqrt(K.sum(K.square(y_pred[-1,:] - y_true[-1,:])))
return loss
def costmap_loss_wrapper(costmap):
costmap = K.squeeze(costmap,axis=len(costmap.shape)-1)
def costmap_loss(y_true, y_pred):
"""
Costmap loss
https://en.wikipedia.org/wiki/Cost_map
:param costmap: TensorFlow tensor
:param y_true: TensorFlow tensor
:param y_pred: TensorFlow tensor
:return: float
init_diff_sqr = K.square(gridmap_idx - tf_init_path[5])
init_dist = K.sqrt(K.sum(init_diff_sqr, axis=1))
init_inv_dist = tf.math.reciprocal_no_nan(tf.math.pow(init_dist,0.1))
"""
valid_indices = tf.where(costmap > 0.35)
valid_costs = tf.gather_nd(costmap, valid_indices)
valid_costs = tf.cast(valid_costs, dtype=tf.float32)
allcosts = tf.constant(0.0)
valid_indices = tf.cast(valid_indices, dtype=tf.float32)
for i in range(y_pred.shape[0]):
#distance to all valid indices
pred_dist = K.sqrt(K.sum(K.square(valid_indices - y_pred[i]),axis=1))
inv_pred_dist = tf.math.reciprocal_no_nan(tf.math.pow(pred_dist,0.1))
# cost for one point in the path
cost_for_point = tf.reduce_sum(tf.multiply(inv_pred_dist,valid_costs))
pred_allcosts = tf.add(allcosts,cost_for_point) if i == 0 else tf.add(pred_allcosts,cost_for_point)
for j in range(y_true.shape[0]):
#distance to all valid indices
true_dist = K.sqrt(K.sum(K.square(valid_indices - y_true[i]),axis=1))
inv_true_dist = tf.math.reciprocal_no_nan(tf.math.pow(true_dist,0.1))
# cost for one point in the path
cost_for_point = tf.reduce_sum(tf.multiply(inv_true_dist,valid_costs))
true_allcosts = tf.add(allcosts,cost_for_point) if j == 0 else tf.add(true_allcosts,cost_for_point)
costmap_loss = tf.abs(tf.subtract(pred_allcosts,true_allcosts))
return costmap_loss
return costmap_loss
#code to train a neural network to predict the next point in a trajectory in tensorflow
def nn(full_skip=False,params=None):
# Grid Map input
ip_gridmap = layers.Input(shape=(1536,1536,1))
# Other inputs
ip_grid_org_res = layers.Input(shape=(3,),name="Grid_origin_res")
ip_left_bnd = layers.Input(shape=(25,2),name="Left_boundary")
ip_right_bnd = layers.Input(shape=(25,2),name="Right_boundary")
ip_car_odo = layers.Input(shape=(3,),name="Car_loc")
ip_init_path = layers.Input(shape=(25,2),name="Initial_path")
#CNN - branch1
#1x1 conv
#x_A = layers.Conv2D(3,kernel_size=1,strides=1)(ip_gridmap)
# Block 1
x_A = layers.Conv2D(32,kernel_size=5,strides=2)(ip_gridmap)
x_A = layers.LeakyReLU()(x_A)
x_A = layers.BatchNormalization()(x_A)
x_A = layers.AvgPool2D(pool_size=(4,4))(x_A)
# Block 2
x_A = layers.Conv2D(64,kernel_size=3,strides=1)(x_A)
x_A = layers.LeakyReLU()(x_A)
x_A = layers.BatchNormalization()(x_A)
x_A = layers.AvgPool2D(pool_size=(2,2))(x_A)
# 1x1 blocks
x_A = layers.Conv2D(128,kernel_size=3,strides=1)(x_A)
x_A = layers.ReLU()(x_A)
x_A = layers.BatchNormalization()(x_A)
x_A = layers.Conv2D(16,kernel_size=1,strides=1)(x_A)
x_A = layers.Conv2D(8,kernel_size=1,strides=1)(x_A)
x_A = layers.Conv2D(2,kernel_size=1,strides=1)(x_A)
x_A = layers.Flatten()(x_A)
#ip_filedetais = layers.Input
# branch 5
conc_grid_orgres_car_odo = layers.concatenate([ip_grid_org_res,ip_car_odo])
#reshaping paths
reshape_init_path = layers.Reshape((50,))(ip_init_path)
reshape_left_bnd = layers.Reshape((50,))(ip_left_bnd)
reshape_right_bnd = layers.Reshape((50,))(ip_right_bnd)
#concatenate feature
concat_feat = layers.concatenate([x_A, reshape_init_path, reshape_left_bnd, reshape_right_bnd, conc_grid_orgres_car_odo])
# Dense Network
# Block 4
output = layers.Dense(16, activation='relu')(concat_feat)
output = layers.BatchNormalization()(output)
#output = layers.ReLU()(output)
output = layers.Dropout(params.get("drop_rate")["dense_rate1"])(output)
# Block 5
"""
output = layers.Dense(32, activation='linear')(output)
output = layers.BatchNormalization()(output)
output = layers.ReLU()(output)
"""
#output = layers.Dropout(params["drop_rate"]["dense_rate2"])(output)
# Block 5
"""
output = layers.Dense(64, activation='linear')(output)
output = layers.BatchNormalization()(output)
output = layers.ReLU()(output)
"""
#output = layers.Dropout(params["drop_rate"]["dense_rate3"])(output)
# Block 5
output = layers.Dense(50, activation='relu',kernel_regularizer=l1(0.01))(output)
output = layers.Dropout(params.get("drop_rate")["dense_rate2"])(output)
if full_skip:
# Block 6-fs
output = layers.add([output,reshape_init_path])
output = layers.Dense(50, activation='linear')(output)
#output
output = layers.Reshape((25,2))(output)
nn_fun = models.Model(inputs = [ip_gridmap,ip_grid_org_res,ip_left_bnd, ip_right_bnd, ip_car_odo, ip_init_path], outputs= output)
nn_fun.compile(
optimizer=_get_optimizer(params.get("optimizer"), lr=params.get("lr")),
loss=[euclidean_distance_loss,endpoint_loss,costmap_loss_wrapper(ip_gridmap)],
loss_weights=params.get("loss_weights"), metrics=params.get("metric")
)
nn_fun.summary(line_length=120)
print(f'Losses:{nn_fun.loss},Loss weights : {params.get("loss_weights")}')
return nn_fun
@saitejamalyala
Copy link
Author

Use a wrapper with an argument/s to make it available to the loss function .

ref: keras-team/keras#2121 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment