Skip to content

Instantly share code, notes, and snippets.

@iamrajee

iamrajee/NMS.py Secret

Created February 7, 2022 06:59
Show Gist options
  • Save iamrajee/d29959fba946a41d98e184d42036479e to your computer and use it in GitHub Desktop.
Save iamrajee/d29959fba946a41d98e184d42036479e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# from math import nan
import rospy
from sensor_msgs.msg import Image
from darknet_ros_msgs.msg import BoundingBoxes
from std_msgs.msg import Header
from std_msgs.msg import String
from sensor_msgs.msg import PointCloud2
from sensor_msgs import point_cloud2
from geometry_msgs.msg import PoseArray
from geometry_msgs.msg import Pose, PoseStamped
import numpy as np
import tf
# tf2_ros
from tf import TransformListener
import cv2
from cv_bridge import CvBridge, CvBridgeError
bounding_boxes = []
yolo_img_pub, poseArray_pub, nms_bbox_pub = None, None, None
box_z = {}
def non_max_suppression_slow(boxes, overlapThresh):
# if there are no boxes, return an empty list
if len(boxes) == 0:
return []
# initialize the list of picked indexes
pick = []
# grab the coordinates of the bounding boxes
# x1 = boxes[:,0]
# y1 = boxes[:,1]
# x2 = boxes[:,2]
# y2 = boxes[:,3]
# x1 = [x[0] for x in boxes]
# y1 = [x[1] for x in boxes]
# x2 = [x[2] for x in boxes]
# y2 = [x[3] for x in boxes]
x1 = np.array([x.xmin for x in boxes])
y1 = np.array([x.ymin for x in boxes])
x2 = np.array([x.xmax for x in boxes])
y2 = np.array([x.ymax for x in boxes])
scores = np.array([x.probability for x in boxes])
# compute the area of the bounding boxes and sort the bounding
# boxes by the bottom-right y-coordinate of the bounding box
area = (x2 - x1 + 1) * (y2 - y1 + 1)
# idxs = np.argsort(y2)
idxs = np.argsort(area)
# keep looping while some indexes still remain in the indexes
# list
while len(idxs) > 0:
# grab the last index in the indexes list, add the index
# value to the list of picked indexes, then initialize
# the suppression list (i.e. indexes that will be deleted)
# using the last index
last = len(idxs) - 1
i = idxs[last]
pick.append(i)
suppress = [last]
# loop over all indexes in the indexes list
for pos in range(0, last):
# grab the current index
j = idxs[pos]
'''
Let say the new bounding box formed by intersection is b3, then to find area of b3 we need to find it parameters.
x_sort = sort(b1.xmin, b1.xmax, b2.xmin, b2.xmax)
y_sort = sort(b1.ymin, b1.ymax, b2.ymin, b2.ymax)
b3.xmin, b3.xmax = x_sort[1], x_sort[2] #i.e 2nd & 3rd element
b3.ymin, b3.ymax = y_sort[1], y_sort[2] #i.e 2nd & 3rd element
b3.area = (b3.xmax - b3.xmin)*(b3.ymax-b3.ymin)
Below is a alterate way to find b3 parameter
'''
# find the largest (x, y) coordinates for the start of
# the bounding box and the smallest (x, y) coordinates
# for the end of the bounding box
xx1 = max(x1[i], x1[j]) #x_sort[1] = maxima of b1.xmin & b2.xmin i.e 2nd element of sort(b1.xmin, b1.xmax, b2.xmin, b2.xmax)
yy1 = max(y1[i], y1[j]) #y_sort[1] = maxima of b1.ymin & b2.ymin i.e 2nd element of sort(b1.ymin, b1.ymax, b2.ymin, b2.ymax)
xx2 = min(x2[i], x2[j]) #x_sort[2] = minima of b1.xmax & b2.xmax i.e 3rd element of sort(b1.xmin, b1.xmax, b2.xmin, b2.xmax)
yy2 = min(y2[i], y2[j]) #y_sort[2] = minima of b1.ymax & b2.ymax i.e 3rd element of sort(b1.ymin, b1.ymax, b2.ymin, b2.ymax)
# compute the width and height of the bounding box
w = max(0, xx2 - xx1) # + 1)
h = max(0, yy2 - yy1) # + 1)
# compute the ratio of overlap between the computed
# bounding box and the bounding box in the area list
''' We are not using iou'''
# iou = float(w * h) / (area[i] + area[j] - float(w * h))
''' Instead we are using (intersection_area / area_of_smallest_box)'''
overlap = float(w * h) / area[j]
# if there is sufficient overlap, suppress the
# current bounding box
if overlap > overlapThresh:
suppress.append(pos)
# delete all indexes from the index list that are in the
# suppression list
idxs = np.delete(idxs, suppress)
# return only the bounding boxes that were picked
# return boxes[pick]
return [boxes[i] for i in pick]
#============================ Bounding Box data Callback ==========================#
def callback_boundingBox(data):
global bounding_boxes
global pc2_data
global nms_bbox_pub
bounding_boxes = data.bounding_boxes
# pick = non_max_suppression_slow(np.array(bounding_boxes).transpose, 0.3)
input_nms = bounding_boxes
# input_nms = np.array(bounding_boxes)
# print("input_nms:", input_nms)
pick = non_max_suppression_slow(input_nms, 0.5)
bounding_boxes = pick
# print("pick:", pick)
print("Differene in len- earlier:", len(input_nms), ", now:", len(pick))
bbox_msg = BoundingBoxes()
bbox_msg.header.frame_id = "detection"
bbox_msg.header.stamp = rospy.Time.now()
bbox_msg.image_header.frame_id = "stereo_camera2_link"
bbox_msg.image_header.stamp = rospy.Time.now()
bbox_msg.bounding_boxes = bounding_boxes
nms_bbox_pub.publish(bbox_msg)
# for box in data.bounding_boxes:
# if pc2_data == None:
# z=None
# else:
# get_z(round((box.xmin+box.xmax)/2),round((box.ymin+box.ymax)/2))
# rospy.loginfo(
# "ID: {}, Class: {}, Probability: {}, Xmin: {}, Xmax: {} Ymin: {}, Ymax: {}".format(
# box.id, box.Class, box.probability, box.xmin, box.xmax, box.ymin, box.ymax
# )
# )
# print(type(box))+
# print("------------------------------")
pc2_data = None
temp_drone_x = None
temp_drone_y = None
temp_drone_z = None
#============================ PointCloud data Callback ==========================#
def callback_pointcloud(data):
global pc2_data
global temp_drone_x, temp_drone_y, temp_drone_z
pc2_data = data
def get_z(x,y):
global pc2_data
assert isinstance(pc2_data, PointCloud2)
# gen = point_cloud2.read_points(pc2_data, field_names='z', skip_nans=False, uvs=[(x, y)])
gen = point_cloud2.read_points(pc2_data, field_names=("x", "y", "z"), skip_nans=False, uvs=[(x, y)])
# print("gen:",list(gen), [e for e in gen])#, list(gen)[0])# type(list(gen)[0]))
next_gen = next(gen)
temp_drone_x = next_gen[0]
temp_drone_y = next_gen[1]
temp_drone_z = next_gen[2]
# print(temp_drone_x, temp_drone_y, temp_drone_z)
# temp_drone_x, temp_drone_y, temp_drone_z = gen[0], gen[1], gen[2]
# print("Z:",next(gen)[0])
# return float(next(gen)[0]) #To return value out of funtion
return (temp_drone_x, temp_drone_y, temp_drone_z)
#============================ Image data Callback ==========================#
def image_callback(img_msg):
try:
# Initialize the CvBridge class
bridge = CvBridge()
# cv_image = bridge.imgmsg_to_cv2(img_msg, "passthrough")
cv_image = bridge.imgmsg_to_cv2(img_msg, "bgr8")
except CvBridgeError as e:
rospy.logerr("CvBridge Error: {0}".format(e))
# Show the converted image
show_image(cv_image)
#============================ Function to show/publish image ==========================#
def show_image(bgr_img):
global bounding_boxes
global temp_drone_x, temp_drone_y, temp_drone_z
global yolo_img_pub, poseArray_pub
image = bgr_img
font = cv2.FONT_HERSHEY_SIMPLEX
org = (00, 185) # org
fontScale = 1 # fontScale
color = (0, 0, 255) # Red color in BGR
thickness = 2 # Line thickness of 2 px
# bounding_boxes.sort(key=lambda x : x.xmin + x.xmax + x.ymin + x.ymax,reverse=True)
poseArray = PoseArray()
poseArray.header.frame_id = "world"
poseArray.header.stamp = rospy.Time.now()
for i, box in enumerate(bounding_boxes):
image = cv2.rectangle(image, (box.xmin, box.ymin), (box.xmax, box.ymax), (36,255,12), 2)
text = box.Class
# if text == "aeroplane" or text == "bird":
# text = "drone_" + str(i)
if text == "aeroplane" or text == "bird":
text = "drone"
image = cv2.putText(image, text, (box.xmin, box.ymin - 10), font, fontScale, color, thickness, cv2.LINE_AA, False) # Using cv2.putText() method
if pc2_data == None:
temp_drone_x, temp_drone_y, temp_drone_z = None, None, None
else:
(temp_drone_x, temp_drone_y, temp_drone_z) = get_z(int(round((box.xmin+box.xmax)/2)),int(round((box.ymin+box.ymax)/2)))
# print(temp_drone_x, temp_drone_y, temp_drone_z)
if np.isnan(temp_drone_x)!=1:
# position, quaternion = tf.TransformListener().lookupTransform("/world", "/stereo_camera_link", rospy.Time(0)) #tf.TransformListener().getLatestCommonTime("/stereo_camera_link", "/world")) #rospy.Time(0)
# temp_drone_x, temp_drone_y, temp_drone_z = temp_drone_x+position[0], temp_drone_y+position[1], temp_drone_z+position[2]
p1 = PoseStamped()
p1.header.frame_id = "stereo_camera2_link" #world
p1.pose.position.x,p1.pose.position.y, p1.pose.position.z = temp_drone_x, temp_drone_y, temp_drone_z
# p1.pose.orientation.w = 1.0 # Neutral orientation
# p1.pose.orientation.z = 1.0 # Neutral orientation
p1.pose.orientation.z = -0.7071068
p1.pose.orientation.w = 0.7071068
p_in_base = tf.TransformListener().transformPose("/world", p1)
# print("Drone_ID: {}, X: {}, Y: {}, Z: {} Probability: {}".format(i, temp_drone_x, temp_drone_y, temp_drone_z, box.probability))
print("Yolo_Drone_ID: {}, X: {}, Y: {}, Z: {} Probability: {}".format(i, round(p_in_base.pose.position.x,5),round(p_in_base.pose.position.y,5), round(p_in_base.pose.position.z,5), box.probability))
# print("position:", position)
# print("quaternion:", quaternion)
pose = Pose()
# pose.position.x,pose.position.y, pose.position.z = temp_drone_x, temp_drone_y, temp_drone_z
pose = p_in_base.pose
poseArray.poses.append( pose )
else:
print("None found!!!!")
poseArray_pub.publish( poseArray )
print("------------------------------")
# cv2.imshow("Image Window", image) #to show cv images
# cv2.waitKey(3)
# yolo_img_pub.publish(CvBridge().cv2_to_imgmsg(image, "bgr8"))
# image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# cv2.VideoWriter("output.avi", cv2.VideoWriter_fourcc(*"MJPG"), 30,(640,480))
# image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
# yolo_img_pub.publish(CvBridge().cv2_to_imgmsg(image, "passthrough")) #passthrough #bgr8
yolo_img_pub.publish(CvBridge().cv2_to_imgmsg(image, "bgr8"))
bounding_boxes = []
def main():
global yolo_img_pub, poseArray_pub, nms_bbox_pub
global bounding_boxes
rospy.init_node('yolo_img_pub', anonymous=False)
# rospy.Subscriber("/usb_cam/image_raw", Image, image_callback) #to test it with usb_cam
# rospy.Subscriber("/darknet_ros/detection_image", Image, image_callback) #din't work well
rospy.Subscriber("/stereo_camera/left/image_raw", Image, image_callback)
rospy.Subscriber('/darknet_ros/bounding_boxes', BoundingBoxes , callback_boundingBox)
rospy.Subscriber('/stereo_camera/points2', PointCloud2, callback_pointcloud)
yolo_img_pub = rospy.Publisher("/yolo_img_pub/image_raw", Image, queue_size=50) #Publish Final Processed image
poseArray_pub = rospy.Publisher("/yolo_poseArray", PoseArray, queue_size=50) #Publish poseArray
nms_bbox_pub = rospy.Publisher('/darknet_ros/bounding_boxes_nms', BoundingBoxes, queue_size=10) #Publish BoundingBoxes
# if tf.frameExists("/world") and tf.frameExists("/map"):
# t = self.tf.getLatestCommonTime("/base_link", "/map")
# position, quaternion = self.tf.lookupTransform("/base_link", "/map", t)
while not rospy.is_shutdown():
rospy.spin()
if __name__ == '__main__':
try :
main()
except rospy.ROSInterruptException:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment