Last active
March 7, 2023 23:15
HWT9053-485 (Witmotion) modbus protocol implementation in python. (not finished yet!)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from pymodbus.client import ModbusSerialClient | |
from pymodbus.constants import Endian | |
from pymodbus.payload import BinaryPayloadDecoder | |
class SensorInterface: | |
def __init__(self, port: str, baudrate: int = 921600, device_address: int = 0x50): | |
self.port = port | |
self.baudrate = baudrate | |
self.device_address = device_address | |
self.client = ModbusSerialClient( | |
method='rtu', | |
port=port, | |
baudrate=baudrate, | |
timeout=0.001, | |
parity='N', | |
stopbits=1, | |
bytesize=8 | |
) | |
print("Connected: ", self.client.connect()) | |
def get_features(self): | |
read = self.client.read_holding_registers( | |
address=0x30, | |
count=0x29, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
data = { | |
"system_time": decoded.decode_16bit_int(), # 0x30 | |
"system_chip": decoded.decode_16bit_int(), # 0x31 | |
"unknown_0": decoded.decode_16bit_int(), # 0x32 | |
"relative": decoded.decode_16bit_int(), # 0x33 | |
"acceleration_x": decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81, # 0x34 | |
"acceleration_y": decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81, # 0x35 | |
"acceleration_z": decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81, # 0x36 | |
"angular_velocity_x": decoded.decode_16bit_int() / 32768.0 * 2000.0, # 0x37 | |
"angular_velocity_y": decoded.decode_16bit_int() / 32768.0 * 2000.0, # 0x38 | |
"angular_velocity_z": decoded.decode_16bit_int() / 32768.0 * 2000.0, # 0x39 | |
"magnetic_field_x": decoded.decode_16bit_int(), # 0x3A | |
"magnetic_field_y": decoded.decode_16bit_int(), # 0x3B | |
"magnetic_field_z": decoded.decode_16bit_int(), # 0x3C | |
"l_roll": decoded.decode_16bit_int(), # 0x3D | |
"h_roll": decoded.decode_16bit_int(), # 0x3E | |
"l_pitch": decoded.decode_16bit_int(), # 0x3F | |
"h_pitch": decoded.decode_16bit_int(), # 0x40 | |
"l_yaw": decoded.decode_16bit_int(), # 0x41 | |
"h_yaw": decoded.decode_16bit_int(), # 0x42 | |
"temperatur": decoded.decode_16bit_int() / 100.0, # 0x43 | |
"unknown_1": decoded.decode_16bit_int(), # 0x44 | |
"unknown_2": decoded.decode_16bit_int(), # 0x45 | |
"unknown_3": decoded.decode_16bit_int(), # 0x46 | |
"unknown_4": decoded.decode_16bit_int(), # 0x47 | |
"unknown_5": decoded.decode_16bit_int(), # 0x48 | |
"unknown_6": decoded.decode_16bit_int(), # 0x49 | |
"unknown_7": decoded.decode_16bit_int(), # 0x4A | |
"unknown_8": decoded.decode_16bit_int(), # 0x4B | |
"unknown_9": decoded.decode_16bit_int(), # 0x4C | |
"unknown_10": decoded.decode_16bit_int(), # 0x4D | |
"unknown_11": decoded.decode_16bit_int(), # 0x4E | |
"unknown_12": decoded.decode_16bit_int(), # 0x4F | |
"unknown_13": decoded.decode_16bit_int(), # 0x50 | |
"quaternion_0": decoded.decode_16bit_int() / 32768.0, # 0x51 | |
"quaternion_1": decoded.decode_16bit_int() / 32768.0, # 0x52 | |
"quaternion_2": decoded.decode_16bit_int() / 32768.0, # 0x53 | |
"quaternion_3": decoded.decode_16bit_int() / 32768.0, # 0x54 | |
"unknown_14": decoded.decode_16bit_int(), # 0x55 | |
"unknown_15": decoded.decode_16bit_int(), # 0x56 | |
"unknown_16": decoded.decode_16bit_int(), # 0x57 | |
"unknown_17": decoded.decode_16bit_int(), # 0x58 | |
} | |
data["roll"] = ((data["h_roll"] << 16) | data["l_roll"]) / 1000.0 | |
data["pitch"] = ((data["h_pitch"] << 16) | data["l_pitch"]) / 1000.0 | |
data["yaw"] = ((data["h_yaw"] << 16) | data["l_yaw"]) / 1000.0 | |
# Remove all unused values | |
del data["system_time"] | |
del data["system_chip"] | |
del data["relative"] | |
del data["l_roll"] | |
del data["h_roll"] | |
del data["l_pitch"] | |
del data["h_pitch"] | |
del data["l_yaw"] | |
del data["h_yaw"] | |
# Remove all unknown values | |
for key in list(data.keys()): | |
if key.startswith("unknown_"): | |
del data[key] | |
return data | |
def get_frequency(self): | |
read = self.client.read_holding_registers( | |
address=0x1F, | |
count=1, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
return decoded.decode_16bit_int() | |
def get_baudrate(self): | |
read = self.client.read_holding_registers( | |
address=0x04, | |
count=1, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
return decoded.decode_16bit_int() | |
def set_baudrate(self, baudrate_idx: int): | |
""" | |
Set the baudrate of the IMU | |
Values: | |
0x01: 4800bps | |
0x02: 9600bps | |
0x03: 19200bps | |
0x04: 38400bps | |
0x05: 57600bps | |
0x06: 115200bps | |
0x07: 230400bps | |
0x08: 460800bps | |
0x09: 921600bps | |
:param baudrate_idx: | |
:return: | |
""" | |
self.client.write_register( | |
address=0x69, | |
value=0xB588, | |
slave=self.device_address, | |
) | |
self.client.write_register( | |
address=0x04, | |
value=baudrate_idx, | |
slave=self.device_address, | |
) | |
self.client.write_register( | |
address=0x00, | |
value=0x00, | |
slave=self.device_address, | |
) | |
def set_frequency(self, frequency_idx: int): | |
""" | |
Set the frequency of the IMU | |
Values: | |
0x00: 256Hz | |
0x01: 188Hz | |
0x02: 98Hz | |
0x03: 42Hz | |
0x04: 20Hz | |
0x05: 10Hz | |
0x06: 5Hz | |
:param frequency_idx: | |
:return: | |
""" | |
self.client.write_register( | |
address=0x69, | |
value=0xB588, | |
slave=self.device_address, | |
) | |
self.client.write_register( | |
address=0x1F, | |
value=frequency_idx, | |
slave=self.device_address, | |
) | |
self.client.write_register( | |
address=0x00, | |
value=0x00, | |
slave=self.device_address, | |
) | |
def reboot(self): | |
self.client.write_register( | |
address=0x69, | |
value=0xB588, | |
slave=self.device_address, | |
) | |
self.client.write_register( | |
address=0x00, | |
value=0x00FF, | |
slave=self.device_address, | |
) | |
def get_acceleration(self): | |
read = self.client.read_holding_registers( | |
address=0x34, | |
count=3, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
a_x = decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81 | |
a_y = decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81 | |
a_z = decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81 | |
return a_x, a_y, a_z | |
def get_angular_velocity(self): | |
read = self.client.read_holding_registers( | |
address=0x37, | |
count=3, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
w_x = decoded.decode_16bit_int() / 32768.0 * 2000.0 | |
w_y = decoded.decode_16bit_int() / 32768.0 * 2000.0 | |
w_z = decoded.decode_16bit_int() / 32768.0 * 2000.0 | |
return w_x, w_y, w_z | |
def get_magnetic_field(self): | |
# TODO: Getting still the wrong values | |
read = self.client.read_holding_registers( | |
address=0x3A, | |
count=3, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
m_x = decoded.decode_16bit_int() | |
m_y = decoded.decode_16bit_int() | |
m_z = decoded.decode_16bit_int() | |
return m_x, m_y, m_z | |
def get_angle_output(self): | |
read = self.client.read_holding_registers( | |
address=0x3D, | |
count=6, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
l_roll = decoded.decode_16bit_int() | |
h_roll = decoded.decode_16bit_int() | |
roll = ((h_roll << 16) | l_roll) / 1000.0 | |
l_pitch = decoded.decode_16bit_int() | |
h_pitch = decoded.decode_16bit_int() | |
pitch = ((h_pitch << 16) | l_pitch) / 1000.0 | |
l_yaw = decoded.decode_16bit_int() | |
h_yaw = decoded.decode_16bit_int() | |
yaw = ((h_yaw << 16) | l_yaw) / 1000.0 | |
return roll, pitch, yaw | |
def get_quaternion(self): | |
read = self.client.read_holding_registers( | |
address=0x51, | |
count=4, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
q0 = decoded.decode_16bit_int() / 32768.0 | |
q1 = decoded.decode_16bit_int() / 32768.0 | |
q2 = decoded.decode_16bit_int() / 32768.0 | |
q3 = decoded.decode_16bit_int() / 32768.0 | |
return q0, q1, q2, q3 | |
def get_temperature(self): | |
read = self.client.read_holding_registers( | |
address=0x43, | |
count=1, | |
slave=self.device_address, | |
) | |
if not read.isError(): | |
decoded = BinaryPayloadDecoder.fromRegisters( | |
read.registers, | |
byteorder=Endian.Big, | |
wordorder=Endian.Big | |
) | |
return decoded.decode_16bit_int() / 100.0 | |
if __name__ == '__main__': | |
sensor = SensorInterface( | |
port="COM4", | |
baudrate=460800, | |
device_address=0x50 | |
) | |
while True: | |
print(sensor.get_angle_output()) | |
time.sleep(0.5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment