Skip to content

Instantly share code, notes, and snippets.

@awesomebytes
Created December 3, 2014 11:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save awesomebytes/feb225a2da753855d882 to your computer and use it in GitHub Desktop.
Save awesomebytes/feb225a2da753855d882 to your computer and use it in GitHub Desktop.
Subscriber tests for a publisher with queue 1
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created on 03/12/14
@author: Sam Pfeiffer
"""
import rospy
from array import array
from std_msgs.msg import Header
global last_seq
last_seq = None
def topic_cb_with_wrong_processing(data):
global last_seq
if last_seq == None:
last_seq = data.seq
return
if last_seq + 1 != data.seq:
rospy.logerr("last_seq != curr_seq: " + str(last_seq + 1) + " != " + str(data.seq))
rospy.loginfo("Cb seq: " + str(data.seq))
last_seq = data.seq
def topic_cb_with_processing(data):
global last_seq
if last_seq == None:
last_seq = data.seq
return
else:
if last_seq + 1 != data.seq:
rospy.logerr("last_seq + 1 != curr_seq: " + str(last_seq + 1) + " != " + str(data.seq))
rospy.loginfo("Cb seq: " + str(data.seq))
last_seq = data.seq
def topic_cb(data):
rospy.loginfo("Cb seq: " + str(data.seq))
class TopicTestWithoutQueue():
def __init__(self):
self.seq_num = None
self.sub = rospy.Subscriber('/header_test', Header, self.topic_cb)
rospy.loginfo("Subscribed to /header_test")
def topic_cb(self, data):
rospy.loginfo("Seq: " + str(data.seq))
self.seq_num = data.seq
# If we lose some time here, we lose callbacks as the publisher has no queue
rospy.sleep(0.1)
def run(self):
last_seq = None
while not rospy.is_shutdown():
if self.seq_num != last_seq:
rospy.loginfo("Current seq: " + str(self.seq_num))
last_seq = self.seq_num
rospy.sleep(0.1)
class TopicTestWithQueue():
def __init__(self):
self.seq_nums = array('l') # efficient arrays
self.sub = rospy.Subscriber('/header_test', Header, self.topic_cb)
rospy.loginfo("Subscribed to /header_test")
def topic_cb(self, data):
rospy.loginfo("Seq: " + str(data.seq))
self.seq_nums.append( data.seq )
def run(self):
while not rospy.is_shutdown():
if self.seq_nums.buffer_info()[1] > 0: # is there anything in the array?
deleted_elem = self.seq_nums.pop(0)
addr, len = self.seq_nums.buffer_info()
rospy.loginfo("Last seq popped of " + str(len) + ": " + str(deleted_elem))
if __name__=="__main__":
rospy.init_node('test_topic_queue_subscriber_')
# Using topic test with queue
# We don't lose messages, and it works flawlessly (the main loop can get all the data)
tt = TopicTestWithQueue()
tt.run()
# Using topic test without queue
# We don't lose messages with this, but it gets very behind (thanks to the sleep)
# tt = TopicTestWithoutQueue()
# tt.run()
# using simple subscriber and callback
# at high rate, loses callbacks
# sub = rospy.Subscriber('/header_test', Header, topic_cb)
# rospy.spin()
# using a simple subscriber and callback with some processing in the callback
# at high rate, loses callbacks
# sub = rospy.Subscriber('/header_test', Header, topic_cb_with_processing)
# rospy.spin()
# using a simple subscriber and callback with some processing in the callback... wrongly done
# at high rate, loses callbacks
# sub = rospy.Subscriber('/header_test', Header, topic_cb_with_wrong_processing)
# rospy.spin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment