Skip to content

Instantly share code, notes, and snippets.

@naveenrobo
Last active July 3, 2024 07:50
Show Gist options
  • Save naveenrobo/007a46447c589766a78ad35b0d17942a to your computer and use it in GitHub Desktop.
Save naveenrobo/007a46447c589766a78ad35b0d17942a to your computer and use it in GitHub Desktop.
imu_bosch.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
from tf_transformations import euler_from_quaternion
from sensor_msgs.msg import Imu, MagneticField
from geometry_msgs.msg import Vector3Stamped
from math import radians
from numpy import sign
import socket
import json
import time
import traceback
class ImuDataExtract(Node):
def __init__(self):
super().__init__("imu_data_extract")
self.declare_parameters(
namespace='',
parameters=[
('udp_port', rclpy.Parameter.Type.INTEGER),
('calibration_mode', False)
]
)
self.udp_port = self.get_parameter('udp_port').value
self.calibration_mode = self.get_parameter('calibration_mode').value
self.initial_angle = None
self.imu_ready = True # Assuming IMU is ready since data is coming from mobile
self.default_value = False
self.last_default_value = None
self.imu_error = True
self.start_time = None
self.error_start_time = None
self.ori_pub = self.create_publisher(Vector3Stamped, '/orientation', 10)
self.imu_data_pub = self.create_publisher(Imu, "/sensors/imu", 10)
if self.calibration_mode:
self.accl_pub = self.create_publisher(Imu, '/imu/accelerometer', 10)
self.mag_data_pub = self.create_publisher(MagneticField, '/imu/magnetometer', 10)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(("0.0.0.0", self.udp_port))
print(f"Listening for UDP packets on port {self.udp_port}")
def read_imu_data(self):
while rclpy.ok():
try:
data, addr = self.socket.recvfrom(1024) # Buffer size is 1024 bytes
data = data.decode("utf-8").strip()
data_dict = json.loads(data)
euler_angle = euler_from_quaternion(
[data_dict['quatX'], data_dict['quatY'], data_dict['quatZ'], data_dict['quatW']])[2]
if self.initial_angle is None:
self.initial_angle = euler_angle
euler_angle = self.initial_angle - euler_angle
if sign(euler_angle) == -1:
euler_angle += 6.283185307179586
val_array = list(data_dict.values())
default_val = all(c == 0.0 for c in val_array)
if default_val and default_val != self.last_default_value:
self.error_start_time = time.time()
self.imu_error = True
elif not default_val and default_val != self.last_default_value:
self.error_start_time = None
self.imu_error = False
if self.imu_error:
if self.error_start_time is not None:
elapsed_time = time.time() - self.error_start_time
if elapsed_time > 5:
print("IMU error detected. No valid data for more than 5 seconds.")
else:
imu = Imu()
imu.header.frame_id = "imu_link"
imu.header.stamp = self.get_clock().now().to_msg()
imu.orientation.x = data_dict['quatX']
imu.orientation.y = data_dict['quatY']
imu.orientation.z = data_dict['quatZ']
imu.orientation.w = data_dict['quatW']
imu.angular_velocity.x = radians(data_dict['gyroX'])
imu.angular_velocity.y = radians(data_dict['gyroY'])
imu.angular_velocity.z = radians(data_dict['gyroZ'])
imu.linear_acceleration.x = data_dict['accelX']
imu.linear_acceleration.y = data_dict['accelY']
imu.linear_acceleration.z = data_dict['accelZ']
self.imu_data_pub.publish(imu)
data_msg = Vector3Stamped()
data_msg.header.frame_id = 'imu_link'
data_msg.header.stamp = self.get_clock().now().to_msg()
data_msg.vector.z = euler_angle
self.ori_pub.publish(data_msg)
if self.calibration_mode:
mag_data = MagneticField()
mag_data.header.frame_id = 'imu_link'
mag_data.header.stamp = self.get_clock().now().to_msg()
mag_data.magnetic_field.x = data_dict['magX']
mag_data.magnetic_field.y = data_dict['magY']
mag_data.magnetic_field.z = data_dict['magZ']
self.mag_data_pub.publish(mag_data)
self.accl_pub.publish(imu)
self.last_default_value = default_val
except Exception as e:
print(f"Unexpected error: {e}\n{traceback.format_exc()}")
def BoschIMUNode(args=None):
rclpy.init(args=args)
imu_extract = ImuDataExtract()
imu_extract.read_imu_data()
imu_extract.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
BoschIMUNode()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment