Skip to content

Instantly share code, notes, and snippets.

@ZdenekM
Last active October 10, 2022 08:55
Show Gist options
  • Save ZdenekM/d84b178444950dbfe46afebf3fe1dd0d to your computer and use it in GitHub Desktop.
Save ZdenekM/d84b178444950dbfe46afebf3fe1dd0d to your computer and use it in GitHub Desktop.
import rospy
import sys
from sensor_msgs.msg import Image
from std_msgs.msg import String
import rosbag
from cv_bridge import CvBridge
import cv2
import json
import time
results = {}
times = {}
last_msg = time.time()
def results_cb(msg):
global last_msg
data = json.loads(msg.data)
ts = data["header"]["stamp"]
ats = time.time()
times[ts].append(ats)
results[ts] = data["detections"]
last_msg = ats
def main():
bag_path = sys.argv[1]
topic = sys.argv[2]
bag = rosbag.Bag(bag_path)
rospy.init_node('listener', anonymous=True)
rospy.Subscriber("/results", String, results_cb)
pub = rospy.Publisher('/movie', Image, queue_size=10)
bridge = CvBridge()
out = None
msgs = 0
print("Publishing...")
for topic, msg, t in bag.read_messages(topics=[topic]):
pub.publish(msg)
ts = "%s.%s" % (msg.header.stamp.secs, msg.header.stamp.nsecs)
times[ts] = [time.time()]
time.sleep(1.0)
msgs+=1
print("Published msgs: " + str(msgs))
print("Result msgs: " + str(len(results)))
while time.time() - last_msg < 5.0: # nevim jak lip poznat ze prisla posledni detekce
time.sleep(1)
print("Result msgs: " + str(len(results)))
if not results:
print("No results!")
return
frames = 0
for topic, msg, t in bag.read_messages(topics=[topic]):
res_key = "%s.%s" % (msg.header.stamp.secs, msg.header.stamp.nsecs)
res = results.pop(res_key, None)
cv_image = bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
if not out:
out_path = bag_path + ".avi"
print("Saving to " + out_path)
shape = tuple(cv_image.shape[1::-1])
print("Shape: " + str(shape))
fourcc = cv2.VideoWriter_fourcc(*'X264')
out = cv2.VideoWriter(out_path, fourcc, 25.0, shape)
frames += 1
if res:
for d in res:
cls_name = d["class_name"]
score = d["score"]
# Draw detection into frame.
if score < 0.5:
continue
# if cls_name in ["face", "person"]:
x1, y1, x2, y2 = [int(coord) for coord in d["bbox"]]
if abs(x2-x1) < 50 or abs(y2-y1) < 50:
continue
cv2.rectangle(cv_image, (x1, y1), (x2, y2), (255, 0, 0), 2)
cv2.putText(cv_image, cls_name, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (36,255,12), 1, cv2.LINE_AA)
out.write(cv_image)
print("Frames: " + str(frames))
if out:
print("releasing")
out.release()
sum = 0
for vals in times.values():
if len(vals) < 2:
continue
sum += vals[1]-vals[0]
print("Mean latency: " + str(sum/len(times)))
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment