Skip to content

Instantly share code, notes, and snippets.

@jones2126

jones2126/driver.py

Created Aug 22, 2020
Embed
What would you like to do?
Updated driver.py from nmea_navsat_driver to filter speed <0.1 m/s
# Software License Agreement (BSD License)
#
# Copyright (c) 2013, Eric Perko
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the names of the authors nor the names of their
# affiliated organizations may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""Provides a driver for NMEA GNSS devices."""
import math
import rospy
from sensor_msgs.msg import NavSatFix, NavSatStatus, TimeReference
from geometry_msgs.msg import TwistStamped, QuaternionStamped
from tf.transformations import quaternion_from_euler
from std_msgs.msg import Float32
from libnmea_navsat_driver.checksum_utils import check_nmea_checksum
import libnmea_navsat_driver.parser
class RosNMEADriver(object):
"""ROS driver for NMEA GNSS devices."""
def __init__(self):
"""Initialize the ROS NMEA driver.
Creates the following ROS publishers:
NavSatFix publisher on the 'fix' channel.
TwistStamped publisher on the 'vel' channel.
QuaternionStamped publisher on the 'heading' channel.
TimeReference publisher on the 'time_reference' channel.
Reads the following ROS parameters:
~time_ref_source (str): The name of the source in published TimeReference messages. (default None)
~useRMC (bool): If true, use RMC NMEA messages. If false, use GGA and VTG messages. (default False)
~epe_quality0 (float): Value to use for default EPE quality for fix type 0. (default 1000000)
~epe_quality1 (float): Value to use for default EPE quality for fix type 1. (default 4.0)
~epe_quality2 (float): Value to use for default EPE quality for fix type 2. (default (0.1)
~epe_quality4 (float): Value to use for default EPE quality for fix type 4. (default 0.02)
~epe_quality5 (float): Value to use for default EPE quality for fix type 5. (default 4.0)
~epe_quality9 (float): Value to use for default EPE quality for fix type 9. (default 3.0)
"""
self.fix_pub = rospy.Publisher('fix', NavSatFix, queue_size=1)
self.vel_pub = rospy.Publisher('vel', TwistStamped, queue_size=1)
self.hdg_deg_pub = rospy.Publisher('heading_deg', Float32, queue_size=1)
self.heading_pub = rospy.Publisher(
'heading', QuaternionStamped, queue_size=1)
self.use_GNSS_time = rospy.get_param('~use_GNSS_time', False)
if not self.use_GNSS_time:
self.time_ref_pub = rospy.Publisher(
'time_reference', TimeReference, queue_size=1)
self.speed_pub = rospy.Publisher('speed', Float32, queue_size=1)
self.time_ref_source = rospy.get_param('~time_ref_source', None)
self.use_RMC = rospy.get_param('~useRMC', False)
self.valid_fix = False
self.last_moving_true_course = math.radians(100) # set at 100 degrees because that is my typical initial heading
# epe = estimated position error
self.default_epe_quality0 = rospy.get_param('~epe_quality0', 1000000)
self.default_epe_quality1 = rospy.get_param('~epe_quality1', 4.0)
self.default_epe_quality2 = rospy.get_param('~epe_quality2', 0.1)
self.default_epe_quality4 = rospy.get_param('~epe_quality4', 0.02)
self.default_epe_quality5 = rospy.get_param('~epe_quality5', 4.0)
self.default_epe_quality9 = rospy.get_param('~epe_quality9', 3.0)
self.using_receiver_epe = False
self.lon_std_dev = float("nan")
self.lat_std_dev = float("nan")
self.alt_std_dev = float("nan")
"""Format for this dictionary is the fix type from a GGA message as the key, with
each entry containing a tuple consisting of a default estimated
position error, a NavSatStatus value, and a NavSatFix covariance value.
GPS Quality indicator:
0: Fix not valid
1: GPS fix
2: Differential GPS fix, OmniSTAR VBS
4: Real-Time Kinematic, fixed integers
5: Real-Time Kinematic, float integers, OmniSTAR XP/HP or Location RTK
"""
self.gps_qualities = {
# Unknown
-1: [
self.default_epe_quality0,
NavSatStatus.STATUS_NO_FIX,
NavSatFix.COVARIANCE_TYPE_UNKNOWN
],
# Invalid
0: [
self.default_epe_quality0,
NavSatStatus.STATUS_NO_FIX,
NavSatFix.COVARIANCE_TYPE_UNKNOWN
],
# SPS
1: [
self.default_epe_quality1,
NavSatStatus.STATUS_FIX,
NavSatFix.COVARIANCE_TYPE_APPROXIMATED
],
# DGPS
2: [
self.default_epe_quality2,
NavSatStatus.STATUS_SBAS_FIX,
NavSatFix.COVARIANCE_TYPE_APPROXIMATED
],
# RTK Fix
4: [
self.default_epe_quality4,
NavSatStatus.STATUS_GBAS_FIX,
NavSatFix.COVARIANCE_TYPE_APPROXIMATED
],
# RTK Float
5: [
self.default_epe_quality5,
NavSatStatus.STATUS_GBAS_FIX,
NavSatFix.COVARIANCE_TYPE_APPROXIMATED
],
# WAAS
9: [
self.default_epe_quality9,
NavSatStatus.STATUS_GBAS_FIX,
NavSatFix.COVARIANCE_TYPE_APPROXIMATED
]
}
def add_sentence(self, nmea_string, frame_id, timestamp=None):
"""Public method to provide a new NMEA sentence to the driver.
Args:
nmea_string (str): NMEA sentence in string form.
frame_id (str): TF frame ID of the GPS receiver.
timestamp(rospy.Time, optional): Time the sentence was received.
If timestamp is not specified, the current time is used.
Returns:
bool: True if the NMEA string is successfully processed, False if there is an error.
"""
#rospy.loginfo("nmea_string: %s", nmea_string) # added for debugging
if not check_nmea_checksum(nmea_string):
rospy.logwarn("Received and skipping a sentence with an invalid checksum. " +
"Sentence was: %s" % repr(nmea_string))
return False
parsed_sentence = libnmea_navsat_driver.parser.parse_nmea_sentence(
nmea_string)
if not parsed_sentence:
rospy.logdebug(
"Failed to parse NMEA sentence. Sentence was: %s" %
nmea_string)
return False
if timestamp:
current_time = timestamp
else:
current_time = rospy.get_rostime()
current_fix = NavSatFix()
current_fix.header.stamp = current_time
current_fix.header.frame_id = frame_id
if not self.use_GNSS_time:
current_time_ref = TimeReference()
current_time_ref.header.stamp = current_time
current_time_ref.header.frame_id = frame_id
if self.time_ref_source:
current_time_ref.source = self.time_ref_source
else:
current_time_ref.source = frame_id
if not self.use_RMC and 'GGA' in parsed_sentence:
current_fix.position_covariance_type = \
NavSatFix.COVARIANCE_TYPE_APPROXIMATED
data = parsed_sentence['GGA']
if self.use_GNSS_time:
if math.isnan(data['utc_time'][0]):
rospy.logwarn("Time in the NMEA sentence is NOT valid")
return False
current_fix.header.stamp = rospy.Time(data['utc_time'][0], data['utc_time'][1])
fix_type = data['fix_type']
if not (fix_type in self.gps_qualities):
fix_type = -1
gps_qual = self.gps_qualities[fix_type]
default_epe = gps_qual[0]
current_fix.status.status = gps_qual[1]
current_fix.position_covariance_type = gps_qual[2]
if gps_qual > 0:
self.valid_fix = True
else:
self.valid_fix = False
current_fix.status.service = NavSatStatus.SERVICE_GPS
latitude = data['latitude']
if data['latitude_direction'] == 'S':
latitude = -latitude
current_fix.latitude = latitude
longitude = data['longitude']
if data['longitude_direction'] == 'W':
longitude = -longitude
current_fix.longitude = longitude
# Altitude is above ellipsoid, so adjust for mean-sea-level
altitude = data['altitude'] + data['mean_sea_level']
current_fix.altitude = altitude
# use default epe std_dev unless we've received a GST sentence with
# epes
if not self.using_receiver_epe or math.isnan(self.lon_std_dev):
self.lon_std_dev = default_epe
if not self.using_receiver_epe or math.isnan(self.lat_std_dev):
self.lat_std_dev = default_epe
if not self.using_receiver_epe or math.isnan(self.alt_std_dev):
self.alt_std_dev = default_epe * 2
hdop = data['hdop']
current_fix.position_covariance[0] = (hdop * self.lon_std_dev) ** 2
current_fix.position_covariance[4] = (hdop * self.lat_std_dev) ** 2
current_fix.position_covariance[8] = (
2 * hdop * self.alt_std_dev) ** 2 # FIXME
self.fix_pub.publish(current_fix)
if not (math.isnan(data['utc_time'][0]) or self.use_GNSS_time):
current_time_ref.time_ref = rospy.Time(
data['utc_time'][0], data['utc_time'][1])
self.last_valid_fix_time = current_time_ref
self.time_ref_pub.publish(current_time_ref)
elif not self.use_RMC and 'VTG' in parsed_sentence:
data = parsed_sentence['VTG']
# Only report VTG data when you've received a valid GGA fix as
# well.
if self.valid_fix:
current_vel = TwistStamped()
current_vel.header.stamp = current_time
current_vel.header.frame_id = frame_id
current_vel.twist.linear.x = data['speed'] * math.sin(data['true_course'])
current_vel.twist.linear.y = data['speed'] * math.cos(data['true_course'])
self.vel_pub.publish(current_vel)
elif 'RMC' in parsed_sentence:
data = parsed_sentence['RMC']
#rospy.loginfo("driver.py - true_course (radians): %s", data['true_course']) # added for debugging
#rospy.loginfo("driver.py - true_course (degrees): %s", data['true_course_deg']) # added for debugging
#rospy.loginfo("driver.py - data: %s", data) # added for debugging
#rospy.loginfo("driver.py - nmea_string: %s", nmea_string) # added for debugging
if self.use_GNSS_time:
if math.isnan(data['utc_time'][0]):
rospy.logwarn("Time in the NMEA sentence is NOT valid")
return False
current_fix.header.stamp = rospy.Time(data['utc_time'][0], data['utc_time'][1])
# Only publish a fix from RMC if the use_RMC flag is set.
if self.use_RMC:
if data['fix_valid']:
current_fix.status.status = NavSatStatus.STATUS_FIX
else:
current_fix.status.status = NavSatStatus.STATUS_NO_FIX
current_fix.status.service = NavSatStatus.SERVICE_GPS
latitude = data['latitude']
if data['latitude_direction'] == 'S':
latitude = -latitude
current_fix.latitude = latitude
longitude = data['longitude']
if data['longitude_direction'] == 'W':
longitude = -longitude
current_fix.longitude = longitude
current_fix.altitude = float('NaN')
current_fix.position_covariance_type = \
NavSatFix.COVARIANCE_TYPE_UNKNOWN
self.fix_pub.publish(current_fix)
if not (math.isnan(data['utc_time'][0]) or self.use_GNSS_time):
current_time_ref.time_ref = rospy.Time(
data['utc_time'][0], data['utc_time'][1])
self.time_ref_pub.publish(current_time_ref)
# Publish velocity from RMC regardless, since GGA doesn't provide
# it.
if data['fix_valid']:
current_vel = TwistStamped()
current_vel.header.stamp = current_time
current_vel.header.frame_id = frame_id
current_vel.twist.linear.x = data['speed'] * \
math.sin(data['true_course'])
current_vel.twist.linear.y = data['speed'] * \
math.cos(data['true_course'])
self.vel_pub.publish(current_vel)
self.speed_pub.publish(data['speed'])
self.hdg_deg_pub.publish(data['true_course_deg'])
if 'HDT' not in parsed_sentence:
#rospy.loginfo("driver.py - true_course: %s", data['true_course']) # added for debugging
#rospy.loginfo("driver.py - data: %s", data) # added for debugging
#rospy.loginfo("driver.py - nmea_string: %s", nmea_string) # added for debugging
current_heading = QuaternionStamped()
current_heading.header.stamp = current_time
current_heading.header.frame_id = frame_id
#q = quaternion_from_euler(0, 0, math.radians(data['true_course']))
#q = quaternion_from_euler(0, 0, data['true_course'])
if data['speed'] > 0.1: # use current true_course if tractor is moving
q = quaternion_from_euler(0, 0, data['true_course'])
self.last_moving_true_course = data['true_course'] # save last_hdg_deg
else:
q = quaternion_from_euler(0, 0, self.last_moving_true_course)
current_heading.quaternion.x = q[0]
current_heading.quaternion.y = q[1]
current_heading.quaternion.z = q[2]
current_heading.quaternion.w = q[3]
self.heading_pub.publish(current_heading)
else:
rospy.loginfo("driver.py - no valid fix, nmea_string: %s", nmea_string) # added for debugging
elif 'GST' in parsed_sentence:
data = parsed_sentence['GST']
# Use receiver-provided error estimate if available
self.using_receiver_epe = True
self.lon_std_dev = data['lon_std_dev']
self.lat_std_dev = data['lat_std_dev']
self.alt_std_dev = data['alt_std_dev']
elif 'HDT' in parsed_sentence:
data = parsed_sentence['HDT']
if data['heading']:
current_heading = QuaternionStamped()
current_heading.header.stamp = current_time
current_heading.header.frame_id = frame_id
q = quaternion_from_euler(0, 0, math.radians(data['heading']))
current_heading.quaternion.x = q[0]
current_heading.quaternion.y = q[1]
current_heading.quaternion.z = q[2]
current_heading.quaternion.w = q[3]
self.heading_pub.publish(current_heading)
else:
return False
@staticmethod
def get_frame_id():
"""Get the TF frame_id.
Queries rosparam for the ~frame_id param. If a tf_prefix param is set,
the frame_id is prefixed with the prefix.
Returns:
str: The fully-qualified TF frame ID.
"""
frame_id = rospy.get_param('~frame_id', 'gps')
# Add the TF prefix
prefix = ""
prefix_param = rospy.search_param('tf_prefix')
if prefix_param:
prefix = rospy.get_param(prefix_param)
return "%s/%s" % (prefix, frame_id)
else:
return frame_id
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment