Skip to content

Instantly share code, notes, and snippets.

@bresilla
Created March 18, 2024 15:38
Show Gist options
  • Save bresilla/a7acd20f08221a86497582a9d079b170 to your computer and use it in GitHub Desktop.
Save bresilla/a7acd20f08221a86497582a9d079b170 to your computer and use it in GitHub Desktop.
complex_tracker
from typing import List
import numpy as np
import torch
import torchvision.ops.boxes as bops
import norfair
from norfair import Detection, Tracker, Paths
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import message_filters
from yolobot_interfaces.msg import Tags
class Trackit(Tracker):
def __init__(self, trackt):
super().__init__(
distance_function=self.iou if trackt == "bbox" else self.euclidean_distance,
distance_threshold=3.33 if trackt == "bbox" else 30)
self.track_form = trackt
self.pather = Paths()
def euclidean_distance(self, detection, tracked_object):
return np.linalg.norm(detection.points - tracked_object.estimate)
def center(self, points):
return [np.mean(np.array(points), axis=0)]
def iou_pytorch(self, detection, tracked_object):
detection_points = np.concatenate([detection.points[0], detection.points[1]])
tracked_object_points = np.concatenate([tracked_object.estimate[0], tracked_object.estimate[1]])
box_a = torch.tensor([detection_points], dtype=torch.float)
box_b = torch.tensor([tracked_object_points], dtype=torch.float)
iou = bops.box_iou(box_a, box_b)
return np.float(1 / iou if iou else 10000)
def iou(self, detection, tracked_object):
box_a = np.concatenate([detection.points[0], detection.points[1]])
box_b = np.concatenate([tracked_object.estimate[0], tracked_object.estimate[1]])
x_a = max(box_a[0], box_b[0])
y_a = max(box_a[1], box_b[1])
x_b = min(box_a[2], box_b[2])
y_b = min(box_a[3], box_b[3])
inter_area = max(0, x_b - x_a + 1) * max(0, y_b - y_a + 1)
box_a_area = (box_a[2] - box_a[0] + 1) * (box_a[3] - box_a[1] + 1)
box_b_area = (box_b[2] - box_b[0] + 1) * (box_b[3] - box_b[1] + 1)
iou = inter_area / float(box_a_area + box_b_area - inter_area)
return 1 / iou if iou else 10000
def gen_detections(self, tags) -> List[Detection]:
norfair_detections: List[Detection] = []
for detection in tags.data:
if self.track_form == "centroid":
points = np.array([
detection.x,
detection.y
])
scores = np.array([detection.p])
elif self.track_form == "bbox":
points = np.array([
[detection.x, detection.y],
[detection.x + detection.w, detection.y + detection.h]
])
scores = np.array([detection.p, detection.p])
norfair_detections.append(Detection(points=points, scores=scores))
return norfair_detections
def draw(self, frame, detections):
tracked_objects = self.update(detections=detections)
if self.track_form == "centroid":
norfair.draw_points(frame, detections)
norfair.draw_tracked_objects(frame, tracked_objects)
elif self.track_form == "bbox":
norfair.draw_boxes(frame, detections)
norfair.draw_tracked_boxes(frame, tracked_objects)
frame = self.pather.draw(frame, tracked_objects)
return frame
class Tracktor(Node):
def __init__(self):
super().__init__('tracker_node')
self.bridge = CvBridge()
self.bgr = (0, 255, 0)
self.tracker = Trackit("bbox")
self.i = 1
self.image_pub = self.create_publisher(Image, 'tracker', 10)
self.image_sub = message_filters.Subscriber(self, Image, '/image_raw')
self.tags_sub = message_filters.Subscriber(self, Tags, '/tags')
self.ts = message_filters.ApproximateTimeSynchronizer([self.image_sub, self.tags_sub], 10, slop=10)
self.ts.registerCallback(self.callback)
def callback(self, image, tags):
self.i += 1
print(self.i)
frame = self.bridge.imgmsg_to_cv2(image, "bgr8")
detections = self.tracker.gen_detections(tags)
self.tracker.draw(frame, detections)
labeled_frame = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8')
self.image_pub.publish(labeled_frame)
def main():
rclpy.init(args=None)
tracktor = Tracktor()
rclpy.spin(tracktor)
rclpy.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment