-
-
Save iamrajee/d26ebde7a66217809c5e187f367a7dcc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
SORT: A Simple, Online and Realtime Tracker | |
Copyright (C) 2016-2020 Alex Bewley alex@bewley.ai | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program. If not, see <http://www.gnu.org/licenses/>. | |
""" | |
from __future__ import print_function | |
import numpy as np | |
import cv2 | |
from skimage import io | |
import csv | |
import io | |
import math | |
import time | |
import argparse | |
from filterpy.kalman import KalmanFilter | |
from filterpy.kalman import ExtendedKalmanFilter #new | |
import rospy | |
from darknet_ros_msgs.msg import BoundingBoxes | |
from darknet_ros_msgs.msg import BoundingBox | |
from sensor_msgs.msg import Image | |
from cv_bridge import CvBridge, CvBridgeError | |
from std_msgs.msg import Int16 | |
import sys | |
import signal | |
def signal_handler(signal, frame): # ctrl + c -> exit program | |
print('You pressed Ctrl+C!') | |
sys.exit(0) | |
signal.signal(signal.SIGINT, signal_handler) | |
try: | |
from numba import jit | |
except: | |
def jit(func): | |
return func | |
np.random.seed(0) | |
#================== | |
from geometry_msgs.msg import PoseArray | |
from geometry_msgs.msg import Pose, PoseStamped | |
from sensor_msgs.msg import PointCloud2 | |
from sensor_msgs import point_cloud2 | |
pc2_data = None | |
temp_drone_x = None | |
temp_drone_y = None | |
temp_drone_z = None | |
import tf | |
from tf import TransformListener | |
#================== | |
def linear_assignment(cost_matrix): | |
try: | |
import lap | |
_, x, y = lap.lapjv(cost_matrix, extend_cost=True) | |
return np.array([[y[i],i] for i in x if i >= 0]) # | |
except ImportError: | |
from scipy.optimize import linear_sum_assignment | |
x, y = linear_sum_assignment(cost_matrix) | |
return np.array(list(zip(x, y))) | |
@jit | |
def iou(bb_test, bb_gt): | |
""" | |
Computes IUO between two bboxes in the form [x1,y1,x2,y2] | |
""" | |
xx1 = np.maximum(bb_test[0], bb_gt[0]) | |
yy1 = np.maximum(bb_test[1], bb_gt[1]) | |
xx2 = np.minimum(bb_test[2], bb_gt[2]) | |
yy2 = np.minimum(bb_test[3], bb_gt[3]) | |
w = np.maximum(0., xx2 - xx1) | |
h = np.maximum(0., yy2 - yy1) | |
wh = w * h | |
o = wh / ((bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) | |
+ (bb_gt[2] - bb_gt[0]) * (bb_gt[3] - bb_gt[1]) - wh) | |
return(o) | |
def convert_bbox_to_z(bbox): | |
""" | |
Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form | |
[x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is | |
the aspect ratio | |
""" | |
w = bbox[2] - bbox[0] | |
h = bbox[3] - bbox[1] | |
x = bbox[0] + w/2. | |
y = bbox[1] + h/2. | |
s = w * h #scale is just area | |
r = w / float(h) | |
return np.array([x, y, s, r]).reshape((4, 1)) | |
def convert_x_to_bbox(x,score=None): | |
""" | |
Takes a bounding box in the centre form [x,y,s,r] and returns it in the form | |
[x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right | |
""" | |
w = np.sqrt(x[2] * x[3]) | |
h = x[2] / w | |
if(score==None): | |
return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4)) | |
else: | |
return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5)) | |
class KalmanBoxTracker(object): | |
""" | |
This class represents the internal state of individual tracked objects observed as bbox. | |
""" | |
count = 0 | |
def __init__(self,bbox): | |
""" | |
Initialises a tracker using initial bounding box. | |
""" | |
#define constant velocity model | |
# self.kf = KalmanFilter(dim_x=7, dim_z=4) | |
self.kf = ExtendedKalmanFilter(dim_x=7, dim_z=4) | |
self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0], [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]]) | |
self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]]) | |
self.kf.R[2:,2:] *= 10. | |
self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities | |
self.kf.P *= 10. | |
self.kf.Q[-1,-1] *= 0.01 | |
self.kf.Q[4:,4:] *= 0.01 | |
self.kf.x[:4] = convert_bbox_to_z(bbox) | |
self.time_since_update = 0 | |
self.id = KalmanBoxTracker.count | |
KalmanBoxTracker.count += 1 | |
self.history = [] | |
self.hits = 0 | |
self.hit_streak = 0 | |
self.age = 0 | |
self.start_time = time.time() | |
self.end_time = time.time() | |
def H_of(x,y): | |
# H = np.array([[1,0,0,0,1,0,0], | |
# [0,1,0,0,0,1,0], | |
# [0,0,1,0,0,0,1], | |
# [0,0,0,1,0,0,0], | |
# [0,0,0,0,1,0,0], | |
# [0,0,0,0,0,1,0], | |
# [0,0,0,0,0,0,1]]) | |
# H = np.array([[1,0,0,0], | |
# [0,1,0,0], | |
# [0,0,1,0], | |
# [0,0,0,1]]) | |
H = np.array([[1,0,0,0,0,0,0], | |
[0,1,0,0,0,0,0], | |
[0,0,1,0,0,0,0], | |
[0,0,0,1,0,0,0]]) | |
return H | |
def Hx(x,y): | |
Hx = np.array([[0], | |
[0], | |
[0], | |
[0]]) | |
# Hx = np.array([[0], | |
# [0], | |
# [0], | |
# [0], | |
# [0], | |
# [0], | |
# [0]]) | |
# Hx = np.array([[1,0,0,0], | |
# [0,1,0,0], | |
# [0,0,1,0], | |
# [0,0,0,1]]) | |
return Hx | |
def update(self,bbox): | |
""" | |
Updates the state vector with observed bbox. | |
""" | |
self.time_since_update = 0 | |
self.history = [] | |
self.hits += 1 | |
self.hit_streak += 1 | |
# self.kf.update(convert_bbox_to_z(bbox)) | |
self.kf.update(convert_bbox_to_z(bbox), HJacobian=self.H_of, Hx=self.Hx) | |
self.start_time = time.time() | |
def predict(self): | |
""" | |
Advances the state vector and returns the predicted bounding box estimate. | |
""" | |
if((self.kf.x[6]+self.kf.x[2])<=0): | |
self.kf.x[6] *= 0.0 | |
self.kf.predict() | |
self.age += 1 | |
# self.end_time = time.time() | |
if(self.time_since_update>0): | |
self.hit_streak = 0 | |
self.time_since_update += 1 | |
self.history.append(convert_x_to_bbox(self.kf.x)) | |
# print("age:", self.age, ", time_since_update:", self.time_since_update, ", elapsed_time:", str(round(self.end_time - self.start_time, 5)), " Sec") | |
return self.history[-1] | |
def get_state(self): | |
""" | |
Returns the current bounding box estimate. | |
""" | |
return convert_x_to_bbox(self.kf.x) | |
def associate_detections_to_trackers(detections,trackers,iou_threshold = 0.3): | |
""" | |
Assigns detections to tracked object (both represented as bounding boxes) | |
Returns 3 lists of matches, unmatched_detections and unmatched_trackers | |
""" | |
if(len(trackers)==0): | |
return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int) | |
iou_matrix = np.zeros((len(detections),len(trackers)),dtype=np.float32) | |
for d,det in enumerate(detections): | |
for t,trk in enumerate(trackers): | |
iou_matrix[d,t] = iou(det,trk) | |
if min(iou_matrix.shape) > 0: | |
a = (iou_matrix > iou_threshold).astype(np.int32) | |
if a.sum(1).max() == 1 and a.sum(0).max() == 1: | |
matched_indices = np.stack(np.where(a), axis=1) | |
else: | |
matched_indices = linear_assignment(-iou_matrix) | |
else: | |
matched_indices = np.empty(shape=(0,2)) | |
unmatched_detections = [] | |
for d, det in enumerate(detections): | |
if(d not in matched_indices[:,0]): | |
unmatched_detections.append(d) | |
unmatched_trackers = [] | |
for t, trk in enumerate(trackers): | |
if(t not in matched_indices[:,1]): | |
unmatched_trackers.append(t) | |
#filter out matched with low IOU | |
matches = [] | |
for m in matched_indices: | |
if(iou_matrix[m[0], m[1]]<iou_threshold): | |
unmatched_detections.append(m[0]) | |
unmatched_trackers.append(m[1]) | |
else: | |
matches.append(m.reshape(1,2)) | |
if(len(matches)==0): | |
matches = np.empty((0,2),dtype=int) | |
else: | |
matches = np.concatenate(matches,axis=0) | |
return matches, np.array(unmatched_detections), np.array(unmatched_trackers) | |
class Sort(object): | |
def __init__(self, max_age=5, min_hits=1): | |
rospy.init_node('sort', anonymous=True) | |
bbox_topic = rospy.get_param("/bbox_topic", 'darknet_ros/bounding_boxes/') | |
self.subb = rospy.Subscriber(bbox_topic, BoundingBoxes, self.boxcallback) | |
self.subb2 = rospy.Subscriber("/steps_done", Int16, step_callback) | |
multi_array_sub = rospy.Subscriber('/multi_array', Float64MultiArray, multi_array_callback) | |
self.pubb = rospy.Publisher('tracked_boxes', BoundingBoxes, queue_size=50) | |
self.rate = rospy.Rate(30) | |
display = rospy.get_param("/display", True) | |
max_age = rospy.get_param("/max_age", max_age) | |
min_hits = rospy.get_param("/min_hits", min_hits) | |
self.iou_threshold = rospy.get_param("/iou_threshold", 0.3) | |
rospy.Subscriber('/stereo_camera/points2', PointCloud2, callback_pointcloud) | |
if display: | |
img_topic = rospy.get_param("/img_topic", '/imx477/image_raw') | |
self.display = display | |
self.subimage = rospy.Subscriber(img_topic, Image, self.imgcallback) | |
self.pubimage = rospy.Publisher('tracked_image', Image, queue_size=20) | |
""" | |
Sets key parameters for SORT | |
""" | |
self.max_age = max_age | |
self.min_hits = min_hits | |
self.trackers = [] | |
self.frame_count = 0 | |
#self.colours = np.random.rand(32, 3) #used only for display | |
self.img_in = 0 | |
self.bbox_checkin = 0 | |
self.bridge = CvBridge() | |
def imgcallback(self, msg): | |
temp_image = msg | |
if self.display: | |
try : | |
self.img = self.bridge.imgmsg_to_cv2(temp_image, "bgr8") | |
except CvBridgeError as e: | |
pass | |
self.img_in = 1 | |
return | |
def boxcallback(self, msg): | |
dets = [] | |
for i in range(len(msg.bounding_boxes)): | |
bbox = msg.bounding_boxes[i] | |
if bbox.Class == "aeroplane" or bbox.Class == "bird" or bbox.Class == "kite" or bbox.Class == "Drone": | |
dets.append(np.array([bbox.xmin, bbox.ymin, bbox.xmax, bbox.ymax, bbox.probability])) | |
self.dets = np.array(dets) | |
self.bbox_checkin=1 | |
return | |
def update(self, dets=np.empty((0, 5))): | |
""" | |
Params: | |
dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...] | |
Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections). | |
Returns the a similar array, where the last column is the object ID. | |
NOTE: The number of objects returned may differ from the number of detections provided. | |
""" | |
self.frame_count += 1 | |
# get predicted locations from existing trackers. | |
trks = np.zeros((len(self.trackers), 5)) | |
to_del = [] | |
ret = [] | |
for t, trk in enumerate(trks): | |
pos = self.trackers[t].predict()[0] | |
trk[:] = [pos[0], pos[1], pos[2], pos[3], 0] | |
if np.any(np.isnan(pos)): | |
to_del.append(t) | |
trks = np.ma.compress_rows(np.ma.masked_invalid(trks)) | |
for t in reversed(to_del): | |
self.trackers.pop(t) | |
matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets,trks,self.iou_threshold) | |
# update matched trackers with assigned detections | |
for m in matched: | |
self.trackers[m[1]].update(dets[m[0], :]) | |
# create and initialise new trackers for unmatched detections | |
for i in unmatched_dets: | |
trk = KalmanBoxTracker(dets[i,:]) | |
self.trackers.append(trk) | |
i = len(self.trackers) | |
for trk in reversed(self.trackers): | |
d = trk.get_state()[0] | |
if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits): | |
ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) # +1 as MOT benchmark requires positive | |
i -= 1 | |
# remove dead tracklet | |
if(trk.time_since_update > self.max_age): | |
self.trackers.pop(i) | |
trk.end_time = time.time() | |
print("age:", trk.age, ", time_since_update:", trk.time_since_update, ", elapsed_time:", str(round(trk.end_time - trk.start_time, 5)), " Sec") | |
if(len(ret)>0): | |
return np.concatenate(ret) | |
return np.empty((0,5)) | |
#=============== | |
def callback_pointcloud(data): | |
global pc2_data | |
global temp_drone_x, temp_drone_y, temp_drone_z | |
pc2_data = data | |
def get_z(x,y): | |
global pc2_data | |
assert isinstance(pc2_data, PointCloud2) | |
# gen = point_cloud2.read_points(pc2_data, field_names='z', skip_nans=False, uvs=[(x, y)]) | |
gen = point_cloud2.read_points(pc2_data, field_names=("x", "y", "z"), skip_nans=False, uvs=[(x, y)]) | |
# print("gen:",list(gen), [e for e in gen])#, list(gen)[0])# type(list(gen)[0])) | |
##=== comment temporeliy | |
# next_gen = next(gen) | |
# temp_drone_x = next_gen[0] | |
# temp_drone_y = next_gen[1] | |
# temp_drone_z = next_gen[2] | |
temp_drone_x, temp_drone_y, temp_drone_z=0,0,0 | |
# print(temp_drone_x, temp_drone_y, temp_drone_z) | |
# temp_drone_x, temp_drone_y, temp_drone_z = gen[0], gen[1], gen[2] | |
# print("Z:",next(gen)[0]) | |
# return float(next(gen)[0]) #To return value out of funtion | |
return (temp_drone_x, temp_drone_y, temp_drone_z) | |
#=============== | |
actual_steps = 0 | |
previous_actual_steps = -1 | |
def step_callback(data): | |
global actual_steps | |
# print("=============> data:",data.data) | |
actual_steps = data.data | |
from std_msgs.msg import Float64 | |
from std_msgs.msg import Float64MultiArray | |
multi_array = None | |
from sklearn.neighbors import KDTree | |
def multi_array_callback(data): | |
global multi_array | |
# print("=============> data:",data.data) | |
multi_array = np.array(data.data).reshape(10,3) | |
# print("multi_array", multi_array) | |
if __name__ == '__main__': | |
colours = np.random.rand(32, 3) #used only for display | |
# print("colours:",colours) #debug | |
mot_tracker = Sort(max_age=200, min_hits=1) #create instance of the SORT tracker | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
org = (00, 185) # org | |
fontScale = 1 # fontScale | |
color = (0, 0, 255) # Red color in BGR | |
thickness = 2 # Line thickness of 2 px | |
loopcount=0 | |
with io.open('/home/rajendra/swarm_ws/csv/Tracked.csv', 'w+', encoding='UTF8') as f: | |
writer = csv.writer(f) | |
# writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC) | |
# writer.writerow(['sq', 'id'].decode('utf-8')) | |
# writer.writerow('\n'.join([i.decode('utf-8', 'ignore') for i in ['sq', 'id']])) | |
# writer.writerow(["sq","id","xmin","ymin","xmax","ymax"]) | |
writer.writerow(["steps","Drone_ID", "X", "Y", "Z", "Guessed_Agentpy_ID", 'GX', 'GY', 'GZ', 'GError']) | |
# cr = csv.writer(f,delimiter=";",lineterminator="\n") | |
# cr.writerow(["sq","id","xmin","ymin","xmax","ymax"]) | |
while True: | |
try: | |
start_time_cycle = time.time() | |
if mot_tracker.bbox_checkin==1: | |
trackers = mot_tracker.update(mot_tracker.dets) | |
# mot_tracker.bbox_checkin=0 ##old <========= | |
mot_tracker.bbox_checkin=1 #new <========= (don't reset) | |
else: | |
trackers = mot_tracker.update(np.empty((0,5))) | |
# print("len(trackers):",len(trackers)) | |
loopcount=loopcount+1 | |
r = BoundingBoxes() | |
for d in range(len(trackers)): | |
rb = BoundingBox() | |
rb.probability=1 | |
rb.xmin = int(trackers[d,0]) | |
rb.ymin = int(trackers[d,1]) | |
rb.xmax = int(trackers[d,2]) | |
rb.ymax = int(trackers[d,3]) | |
rb.id = int(trackers[d,4]) | |
rb.Class = 'tracked' | |
# print("Old_CSV(sq,id,xmin,ymin,xmax,ymax):",loopcount, rb.id,rb.xmin,rb.ymin,rb.xmax,rb.ymax) | |
# writer.writerow([loopcount, rb.id,rb.xmin,rb.ymin,rb.xmax,rb.ymax]) | |
#=============== | |
box = rb | |
if pc2_data == None: | |
temp_drone_x, temp_drone_y, temp_drone_z = None, None, None | |
else: | |
(temp_drone_x, temp_drone_y, temp_drone_z) = get_z(int(round((box.xmin+box.xmax)/2)),int(round((box.ymin+box.ymax)/2))) | |
p1 = PoseStamped() | |
p1.header.frame_id = "stereo_camera2_link" #world | |
p1.pose.position.x,p1.pose.position.y, p1.pose.position.z = temp_drone_x, temp_drone_y, temp_drone_z | |
# p1.pose.orientation.w = 1.0 # Neutral orientation | |
# p1.pose.orientation.z = 1.0 # Neutral orientation | |
p1.pose.orientation.z = -0.7071068 | |
p1.pose.orientation.w = 0.7071068 | |
listener = tf.TransformListener() | |
listener.waitForTransform("/world", "/stereo_camera2_link", rospy.Time(), rospy.Duration(4.0)) | |
# (trans, rot) = listener.lookupTransform("/frame1", "/frame2", rospy.Time(0)) | |
p_in_base = listener.transformPose("/world", p1) | |
# p_in_base = tf.TransformListener().transformPose("/world", p1) | |
# print("New_Yolo_Drone_ID: {}, X: {}, Y: {}, Z: {}".format(rb.id, round(p_in_base.pose.position.x,4)*100,round(p_in_base.pose.position.y,4)*100, round(p_in_base.pose.position.z,4)*100)) | |
# writer.writerow([loopcount,rb.id, round(p_in_base.pose.position.x,4)*100,round(p_in_base.pose.position.y,4)*100, round(p_in_base.pose.position.z,4)*100]) | |
if actual_steps != previous_actual_steps: | |
if math.isnan(temp_drone_z)!=1: | |
Current_Tracked = [[p_in_base.pose.position.x, p_in_base.pose.position.y, p_in_base.pose.position.z]] | |
tree = KDTree(multi_array, leaf_size=2) | |
dist, ind = tree.query(Current_Tracked, k=1) | |
# print("dist,ind:", dist,ind) | |
Guessed_Agentpy_ID = ind[0][0] | |
GX,GY,GZ = multi_array[ind[0][0]]*100 #convert to cm | |
GError=dist[0][0]*100 #convert to cm | |
print("Steps: {}, Tracked_ID: {}, X: {}, Y: {}, Z: {}, Guessed_Agentpy_ID: {}, GX: {}, GY: {}, GZ: {}, GError: {}".format(actual_steps, rb.id, round(p_in_base.pose.position.x,4)*100,round(p_in_base.pose.position.y,4)*100, round(p_in_base.pose.position.z,4)*100, Guessed_Agentpy_ID, GX, GY, GZ, GError)) | |
writer.writerow([actual_steps,rb.id, round(p_in_base.pose.position.x,4)*100,round(p_in_base.pose.position.y,4)*100, round(p_in_base.pose.position.z,4)*100, Guessed_Agentpy_ID, GX, GY, GZ, GError]) | |
#=============== | |
r.bounding_boxes.append(rb) | |
if mot_tracker.img_in==1 and mot_tracker.display: | |
res = trackers[d].astype(np.int32) | |
rgb=colours[res[4]%32,:]*255 | |
cv2.rectangle(mot_tracker.img, (res[0],res[1]), (res[2],res[3]), (rgb[0],rgb[1],rgb[2]), 4) | |
# cv2.putText(mot_tracker.img, "ID : %d"%(res[4]), (res[0],res[1]), cv2.FONT_HERSHEY_SIMPLEX, 1.6, (200,85,200), 6) | |
# cv2.putText(mot_tracker.img, "Drone_%d"%(res[4]), (res[0],res[1] - 10), font, fontScale, color, thickness, cv2.LINE_AA, False) | |
cv2.putText(mot_tracker.img, "%d"%(res[4]), (res[0],res[1] - 10), font, fontScale, color, thickness, cv2.LINE_AA, False) | |
if mot_tracker.img_in==1 and mot_tracker.display: | |
try : | |
mot_tracker.image = mot_tracker.bridge.cv2_to_imgmsg(mot_tracker.img, "bgr8") | |
mot_tracker.image.header.stamp = rospy.Time.now() | |
mot_tracker.pubimage.publish(mot_tracker.image) | |
except CvBridgeError as e: | |
pass | |
cycle_time = time.time() - start_time_cycle | |
if len(r.bounding_boxes)>0: #prevent empty box | |
r.header.stamp = rospy.Time.now() | |
mot_tracker.pubb.publish(r) | |
# print("cycle_time: ", cycle_time) | |
mot_tracker.rate.sleep() | |
previous_actual_steps = actual_steps | |
except (rospy.ROSInterruptException, SystemExit, KeyboardInterrupt): | |
sys.exit(0) | |
# except: | |
# pass | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment