-
-
Save iamrajee/d29959fba946a41d98e184d42036479e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# from math import nan | |
import rospy | |
from sensor_msgs.msg import Image | |
from darknet_ros_msgs.msg import BoundingBoxes | |
from std_msgs.msg import Header | |
from std_msgs.msg import String | |
from sensor_msgs.msg import PointCloud2 | |
from sensor_msgs import point_cloud2 | |
from geometry_msgs.msg import PoseArray | |
from geometry_msgs.msg import Pose, PoseStamped | |
import numpy as np | |
import tf | |
# tf2_ros | |
from tf import TransformListener | |
import cv2 | |
from cv_bridge import CvBridge, CvBridgeError | |
bounding_boxes = [] | |
yolo_img_pub, poseArray_pub, nms_bbox_pub = None, None, None | |
box_z = {} | |
def non_max_suppression_slow(boxes, overlapThresh): | |
# if there are no boxes, return an empty list | |
if len(boxes) == 0: | |
return [] | |
# initialize the list of picked indexes | |
pick = [] | |
# grab the coordinates of the bounding boxes | |
# x1 = boxes[:,0] | |
# y1 = boxes[:,1] | |
# x2 = boxes[:,2] | |
# y2 = boxes[:,3] | |
# x1 = [x[0] for x in boxes] | |
# y1 = [x[1] for x in boxes] | |
# x2 = [x[2] for x in boxes] | |
# y2 = [x[3] for x in boxes] | |
x1 = np.array([x.xmin for x in boxes]) | |
y1 = np.array([x.ymin for x in boxes]) | |
x2 = np.array([x.xmax for x in boxes]) | |
y2 = np.array([x.ymax for x in boxes]) | |
scores = np.array([x.probability for x in boxes]) | |
# compute the area of the bounding boxes and sort the bounding | |
# boxes by the bottom-right y-coordinate of the bounding box | |
area = (x2 - x1 + 1) * (y2 - y1 + 1) | |
# idxs = np.argsort(y2) | |
idxs = np.argsort(area) | |
# keep looping while some indexes still remain in the indexes | |
# list | |
while len(idxs) > 0: | |
# grab the last index in the indexes list, add the index | |
# value to the list of picked indexes, then initialize | |
# the suppression list (i.e. indexes that will be deleted) | |
# using the last index | |
last = len(idxs) - 1 | |
i = idxs[last] | |
pick.append(i) | |
suppress = [last] | |
# loop over all indexes in the indexes list | |
for pos in range(0, last): | |
# grab the current index | |
j = idxs[pos] | |
''' | |
Let say the new bounding box formed by intersection is b3, then to find area of b3 we need to find it parameters. | |
x_sort = sort(b1.xmin, b1.xmax, b2.xmin, b2.xmax) | |
y_sort = sort(b1.ymin, b1.ymax, b2.ymin, b2.ymax) | |
b3.xmin, b3.xmax = x_sort[1], x_sort[2] #i.e 2nd & 3rd element | |
b3.ymin, b3.ymax = y_sort[1], y_sort[2] #i.e 2nd & 3rd element | |
b3.area = (b3.xmax - b3.xmin)*(b3.ymax-b3.ymin) | |
Below is a alterate way to find b3 parameter | |
''' | |
# find the largest (x, y) coordinates for the start of | |
# the bounding box and the smallest (x, y) coordinates | |
# for the end of the bounding box | |
xx1 = max(x1[i], x1[j]) #x_sort[1] = maxima of b1.xmin & b2.xmin i.e 2nd element of sort(b1.xmin, b1.xmax, b2.xmin, b2.xmax) | |
yy1 = max(y1[i], y1[j]) #y_sort[1] = maxima of b1.ymin & b2.ymin i.e 2nd element of sort(b1.ymin, b1.ymax, b2.ymin, b2.ymax) | |
xx2 = min(x2[i], x2[j]) #x_sort[2] = minima of b1.xmax & b2.xmax i.e 3rd element of sort(b1.xmin, b1.xmax, b2.xmin, b2.xmax) | |
yy2 = min(y2[i], y2[j]) #y_sort[2] = minima of b1.ymax & b2.ymax i.e 3rd element of sort(b1.ymin, b1.ymax, b2.ymin, b2.ymax) | |
# compute the width and height of the bounding box | |
w = max(0, xx2 - xx1) # + 1) | |
h = max(0, yy2 - yy1) # + 1) | |
# compute the ratio of overlap between the computed | |
# bounding box and the bounding box in the area list | |
''' We are not using iou''' | |
# iou = float(w * h) / (area[i] + area[j] - float(w * h)) | |
''' Instead we are using (intersection_area / area_of_smallest_box)''' | |
overlap = float(w * h) / area[j] | |
# if there is sufficient overlap, suppress the | |
# current bounding box | |
if overlap > overlapThresh: | |
suppress.append(pos) | |
# delete all indexes from the index list that are in the | |
# suppression list | |
idxs = np.delete(idxs, suppress) | |
# return only the bounding boxes that were picked | |
# return boxes[pick] | |
return [boxes[i] for i in pick] | |
#============================ Bounding Box data Callback ==========================# | |
def callback_boundingBox(data): | |
global bounding_boxes | |
global pc2_data | |
global nms_bbox_pub | |
bounding_boxes = data.bounding_boxes | |
# pick = non_max_suppression_slow(np.array(bounding_boxes).transpose, 0.3) | |
input_nms = bounding_boxes | |
# input_nms = np.array(bounding_boxes) | |
# print("input_nms:", input_nms) | |
pick = non_max_suppression_slow(input_nms, 0.5) | |
bounding_boxes = pick | |
# print("pick:", pick) | |
print("Differene in len- earlier:", len(input_nms), ", now:", len(pick)) | |
bbox_msg = BoundingBoxes() | |
bbox_msg.header.frame_id = "detection" | |
bbox_msg.header.stamp = rospy.Time.now() | |
bbox_msg.image_header.frame_id = "stereo_camera2_link" | |
bbox_msg.image_header.stamp = rospy.Time.now() | |
bbox_msg.bounding_boxes = bounding_boxes | |
nms_bbox_pub.publish(bbox_msg) | |
# for box in data.bounding_boxes: | |
# if pc2_data == None: | |
# z=None | |
# else: | |
# get_z(round((box.xmin+box.xmax)/2),round((box.ymin+box.ymax)/2)) | |
# rospy.loginfo( | |
# "ID: {}, Class: {}, Probability: {}, Xmin: {}, Xmax: {} Ymin: {}, Ymax: {}".format( | |
# box.id, box.Class, box.probability, box.xmin, box.xmax, box.ymin, box.ymax | |
# ) | |
# ) | |
# print(type(box))+ | |
# print("------------------------------") | |
pc2_data = None | |
temp_drone_x = None | |
temp_drone_y = None | |
temp_drone_z = None | |
#============================ PointCloud data Callback ==========================# | |
def callback_pointcloud(data): | |
global pc2_data | |
global temp_drone_x, temp_drone_y, temp_drone_z | |
pc2_data = data | |
def get_z(x,y): | |
global pc2_data | |
assert isinstance(pc2_data, PointCloud2) | |
# gen = point_cloud2.read_points(pc2_data, field_names='z', skip_nans=False, uvs=[(x, y)]) | |
gen = point_cloud2.read_points(pc2_data, field_names=("x", "y", "z"), skip_nans=False, uvs=[(x, y)]) | |
# print("gen:",list(gen), [e for e in gen])#, list(gen)[0])# type(list(gen)[0])) | |
next_gen = next(gen) | |
temp_drone_x = next_gen[0] | |
temp_drone_y = next_gen[1] | |
temp_drone_z = next_gen[2] | |
# print(temp_drone_x, temp_drone_y, temp_drone_z) | |
# temp_drone_x, temp_drone_y, temp_drone_z = gen[0], gen[1], gen[2] | |
# print("Z:",next(gen)[0]) | |
# return float(next(gen)[0]) #To return value out of funtion | |
return (temp_drone_x, temp_drone_y, temp_drone_z) | |
#============================ Image data Callback ==========================# | |
def image_callback(img_msg): | |
try: | |
# Initialize the CvBridge class | |
bridge = CvBridge() | |
# cv_image = bridge.imgmsg_to_cv2(img_msg, "passthrough") | |
cv_image = bridge.imgmsg_to_cv2(img_msg, "bgr8") | |
except CvBridgeError as e: | |
rospy.logerr("CvBridge Error: {0}".format(e)) | |
# Show the converted image | |
show_image(cv_image) | |
#============================ Function to show/publish image ==========================# | |
def show_image(bgr_img): | |
global bounding_boxes | |
global temp_drone_x, temp_drone_y, temp_drone_z | |
global yolo_img_pub, poseArray_pub | |
image = bgr_img | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
org = (00, 185) # org | |
fontScale = 1 # fontScale | |
color = (0, 0, 255) # Red color in BGR | |
thickness = 2 # Line thickness of 2 px | |
# bounding_boxes.sort(key=lambda x : x.xmin + x.xmax + x.ymin + x.ymax,reverse=True) | |
poseArray = PoseArray() | |
poseArray.header.frame_id = "world" | |
poseArray.header.stamp = rospy.Time.now() | |
for i, box in enumerate(bounding_boxes): | |
image = cv2.rectangle(image, (box.xmin, box.ymin), (box.xmax, box.ymax), (36,255,12), 2) | |
text = box.Class | |
# if text == "aeroplane" or text == "bird": | |
# text = "drone_" + str(i) | |
if text == "aeroplane" or text == "bird": | |
text = "drone" | |
image = cv2.putText(image, text, (box.xmin, box.ymin - 10), font, fontScale, color, thickness, cv2.LINE_AA, False) # Using cv2.putText() method | |
if pc2_data == None: | |
temp_drone_x, temp_drone_y, temp_drone_z = None, None, None | |
else: | |
(temp_drone_x, temp_drone_y, temp_drone_z) = get_z(int(round((box.xmin+box.xmax)/2)),int(round((box.ymin+box.ymax)/2))) | |
# print(temp_drone_x, temp_drone_y, temp_drone_z) | |
if np.isnan(temp_drone_x)!=1: | |
# position, quaternion = tf.TransformListener().lookupTransform("/world", "/stereo_camera_link", rospy.Time(0)) #tf.TransformListener().getLatestCommonTime("/stereo_camera_link", "/world")) #rospy.Time(0) | |
# temp_drone_x, temp_drone_y, temp_drone_z = temp_drone_x+position[0], temp_drone_y+position[1], temp_drone_z+position[2] | |
p1 = PoseStamped() | |
p1.header.frame_id = "stereo_camera2_link" #world | |
p1.pose.position.x,p1.pose.position.y, p1.pose.position.z = temp_drone_x, temp_drone_y, temp_drone_z | |
# p1.pose.orientation.w = 1.0 # Neutral orientation | |
# p1.pose.orientation.z = 1.0 # Neutral orientation | |
p1.pose.orientation.z = -0.7071068 | |
p1.pose.orientation.w = 0.7071068 | |
p_in_base = tf.TransformListener().transformPose("/world", p1) | |
# print("Drone_ID: {}, X: {}, Y: {}, Z: {} Probability: {}".format(i, temp_drone_x, temp_drone_y, temp_drone_z, box.probability)) | |
print("Yolo_Drone_ID: {}, X: {}, Y: {}, Z: {} Probability: {}".format(i, round(p_in_base.pose.position.x,5),round(p_in_base.pose.position.y,5), round(p_in_base.pose.position.z,5), box.probability)) | |
# print("position:", position) | |
# print("quaternion:", quaternion) | |
pose = Pose() | |
# pose.position.x,pose.position.y, pose.position.z = temp_drone_x, temp_drone_y, temp_drone_z | |
pose = p_in_base.pose | |
poseArray.poses.append( pose ) | |
else: | |
print("None found!!!!") | |
poseArray_pub.publish( poseArray ) | |
print("------------------------------") | |
# cv2.imshow("Image Window", image) #to show cv images | |
# cv2.waitKey(3) | |
# yolo_img_pub.publish(CvBridge().cv2_to_imgmsg(image, "bgr8")) | |
# image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
# cv2.VideoWriter("output.avi", cv2.VideoWriter_fourcc(*"MJPG"), 30,(640,480)) | |
# image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
# yolo_img_pub.publish(CvBridge().cv2_to_imgmsg(image, "passthrough")) #passthrough #bgr8 | |
yolo_img_pub.publish(CvBridge().cv2_to_imgmsg(image, "bgr8")) | |
bounding_boxes = [] | |
def main(): | |
global yolo_img_pub, poseArray_pub, nms_bbox_pub | |
global bounding_boxes | |
rospy.init_node('yolo_img_pub', anonymous=False) | |
# rospy.Subscriber("/usb_cam/image_raw", Image, image_callback) #to test it with usb_cam | |
# rospy.Subscriber("/darknet_ros/detection_image", Image, image_callback) #din't work well | |
rospy.Subscriber("/stereo_camera/left/image_raw", Image, image_callback) | |
rospy.Subscriber('/darknet_ros/bounding_boxes', BoundingBoxes , callback_boundingBox) | |
rospy.Subscriber('/stereo_camera/points2', PointCloud2, callback_pointcloud) | |
yolo_img_pub = rospy.Publisher("/yolo_img_pub/image_raw", Image, queue_size=50) #Publish Final Processed image | |
poseArray_pub = rospy.Publisher("/yolo_poseArray", PoseArray, queue_size=50) #Publish poseArray | |
nms_bbox_pub = rospy.Publisher('/darknet_ros/bounding_boxes_nms', BoundingBoxes, queue_size=10) #Publish BoundingBoxes | |
# if tf.frameExists("/world") and tf.frameExists("/map"): | |
# t = self.tf.getLatestCommonTime("/base_link", "/map") | |
# position, quaternion = self.tf.lookupTransform("/base_link", "/map", t) | |
while not rospy.is_shutdown(): | |
rospy.spin() | |
if __name__ == '__main__': | |
try : | |
main() | |
except rospy.ROSInterruptException: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment