Created
August 11, 2021 09:40
-
-
Save saitejamalyala/b3a7602c1369f7d6763651dee4c3ae91 to your computer and use it in GitHub Desktop.
This gist provides an example of how to use another parmeter/information to calculate loss in keras model while using model.fit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tensorflow.keras import backend as K | |
import tensorflow as tf | |
from tensorflow.keras import layers | |
from tensorflow.keras import models | |
import tensorflow as tf | |
from tensorflow.keras.regularizers import l1,l1_l2,l2 | |
from tensorflow.python.keras.regularizers import L1 | |
from rosbag2numpy.config import params | |
def euclidean_distance_loss(y_true, y_pred): | |
""" | |
Euclidean distance loss | |
https://en.wikipedia.org/wiki/Euclidean_distance | |
:param y_true: TensorFlow tensor | |
:param y_p red: TensorFlow tensor of the same shape as y_true | |
:return: float | |
""" | |
#original euclidean distance loss = K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1)) | |
#loss = K.mean(K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1))) | |
loss = K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1)) | |
return loss | |
def endpoint_loss(y_true, y_pred): # or final displacement error | |
loss = K.sqrt(K.sum(K.square(y_pred[-1,:] - y_true[-1,:]))) | |
return loss | |
def costmap_loss_wrapper(costmap): | |
costmap = K.squeeze(costmap,axis=len(costmap.shape)-1) | |
def costmap_loss(y_true, y_pred): | |
""" | |
Costmap loss | |
https://en.wikipedia.org/wiki/Cost_map | |
:param costmap: TensorFlow tensor | |
:param y_true: TensorFlow tensor | |
:param y_pred: TensorFlow tensor | |
:return: float | |
init_diff_sqr = K.square(gridmap_idx - tf_init_path[5]) | |
init_dist = K.sqrt(K.sum(init_diff_sqr, axis=1)) | |
init_inv_dist = tf.math.reciprocal_no_nan(tf.math.pow(init_dist,0.1)) | |
""" | |
valid_indices = tf.where(costmap > 0.35) | |
valid_costs = tf.gather_nd(costmap, valid_indices) | |
valid_costs = tf.cast(valid_costs, dtype=tf.float32) | |
allcosts = tf.constant(0.0) | |
valid_indices = tf.cast(valid_indices, dtype=tf.float32) | |
for i in range(y_pred.shape[0]): | |
#distance to all valid indices | |
pred_dist = K.sqrt(K.sum(K.square(valid_indices - y_pred[i]),axis=1)) | |
inv_pred_dist = tf.math.reciprocal_no_nan(tf.math.pow(pred_dist,0.1)) | |
# cost for one point in the path | |
cost_for_point = tf.reduce_sum(tf.multiply(inv_pred_dist,valid_costs)) | |
pred_allcosts = tf.add(allcosts,cost_for_point) if i == 0 else tf.add(pred_allcosts,cost_for_point) | |
for j in range(y_true.shape[0]): | |
#distance to all valid indices | |
true_dist = K.sqrt(K.sum(K.square(valid_indices - y_true[i]),axis=1)) | |
inv_true_dist = tf.math.reciprocal_no_nan(tf.math.pow(true_dist,0.1)) | |
# cost for one point in the path | |
cost_for_point = tf.reduce_sum(tf.multiply(inv_true_dist,valid_costs)) | |
true_allcosts = tf.add(allcosts,cost_for_point) if j == 0 else tf.add(true_allcosts,cost_for_point) | |
costmap_loss = tf.abs(tf.subtract(pred_allcosts,true_allcosts)) | |
return costmap_loss | |
return costmap_loss | |
#code to train a neural network to predict the next point in a trajectory in tensorflow | |
def nn(full_skip=False,params=None): | |
# Grid Map input | |
ip_gridmap = layers.Input(shape=(1536,1536,1)) | |
# Other inputs | |
ip_grid_org_res = layers.Input(shape=(3,),name="Grid_origin_res") | |
ip_left_bnd = layers.Input(shape=(25,2),name="Left_boundary") | |
ip_right_bnd = layers.Input(shape=(25,2),name="Right_boundary") | |
ip_car_odo = layers.Input(shape=(3,),name="Car_loc") | |
ip_init_path = layers.Input(shape=(25,2),name="Initial_path") | |
#CNN - branch1 | |
#1x1 conv | |
#x_A = layers.Conv2D(3,kernel_size=1,strides=1)(ip_gridmap) | |
# Block 1 | |
x_A = layers.Conv2D(32,kernel_size=5,strides=2)(ip_gridmap) | |
x_A = layers.LeakyReLU()(x_A) | |
x_A = layers.BatchNormalization()(x_A) | |
x_A = layers.AvgPool2D(pool_size=(4,4))(x_A) | |
# Block 2 | |
x_A = layers.Conv2D(64,kernel_size=3,strides=1)(x_A) | |
x_A = layers.LeakyReLU()(x_A) | |
x_A = layers.BatchNormalization()(x_A) | |
x_A = layers.AvgPool2D(pool_size=(2,2))(x_A) | |
# 1x1 blocks | |
x_A = layers.Conv2D(128,kernel_size=3,strides=1)(x_A) | |
x_A = layers.ReLU()(x_A) | |
x_A = layers.BatchNormalization()(x_A) | |
x_A = layers.Conv2D(16,kernel_size=1,strides=1)(x_A) | |
x_A = layers.Conv2D(8,kernel_size=1,strides=1)(x_A) | |
x_A = layers.Conv2D(2,kernel_size=1,strides=1)(x_A) | |
x_A = layers.Flatten()(x_A) | |
#ip_filedetais = layers.Input | |
# branch 5 | |
conc_grid_orgres_car_odo = layers.concatenate([ip_grid_org_res,ip_car_odo]) | |
#reshaping paths | |
reshape_init_path = layers.Reshape((50,))(ip_init_path) | |
reshape_left_bnd = layers.Reshape((50,))(ip_left_bnd) | |
reshape_right_bnd = layers.Reshape((50,))(ip_right_bnd) | |
#concatenate feature | |
concat_feat = layers.concatenate([x_A, reshape_init_path, reshape_left_bnd, reshape_right_bnd, conc_grid_orgres_car_odo]) | |
# Dense Network | |
# Block 4 | |
output = layers.Dense(16, activation='relu')(concat_feat) | |
output = layers.BatchNormalization()(output) | |
#output = layers.ReLU()(output) | |
output = layers.Dropout(params.get("drop_rate")["dense_rate1"])(output) | |
# Block 5 | |
""" | |
output = layers.Dense(32, activation='linear')(output) | |
output = layers.BatchNormalization()(output) | |
output = layers.ReLU()(output) | |
""" | |
#output = layers.Dropout(params["drop_rate"]["dense_rate2"])(output) | |
# Block 5 | |
""" | |
output = layers.Dense(64, activation='linear')(output) | |
output = layers.BatchNormalization()(output) | |
output = layers.ReLU()(output) | |
""" | |
#output = layers.Dropout(params["drop_rate"]["dense_rate3"])(output) | |
# Block 5 | |
output = layers.Dense(50, activation='relu',kernel_regularizer=l1(0.01))(output) | |
output = layers.Dropout(params.get("drop_rate")["dense_rate2"])(output) | |
if full_skip: | |
# Block 6-fs | |
output = layers.add([output,reshape_init_path]) | |
output = layers.Dense(50, activation='linear')(output) | |
#output | |
output = layers.Reshape((25,2))(output) | |
nn_fun = models.Model(inputs = [ip_gridmap,ip_grid_org_res,ip_left_bnd, ip_right_bnd, ip_car_odo, ip_init_path], outputs= output) | |
nn_fun.compile( | |
optimizer=_get_optimizer(params.get("optimizer"), lr=params.get("lr")), | |
loss=[euclidean_distance_loss,endpoint_loss,costmap_loss_wrapper(ip_gridmap)], | |
loss_weights=params.get("loss_weights"), metrics=params.get("metric") | |
) | |
nn_fun.summary(line_length=120) | |
print(f'Losses:{nn_fun.loss},Loss weights : {params.get("loss_weights")}') | |
return nn_fun |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Use a wrapper with an argument/s to make it available to the loss function .
ref: keras-team/keras#2121 (comment)