Skip to content

Instantly share code, notes, and snippets.

@Lxnus
Last active March 7, 2023 23:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lxnus/f510ebfb85612830bdb01615e926036c to your computer and use it in GitHub Desktop.
Save Lxnus/f510ebfb85612830bdb01615e926036c to your computer and use it in GitHub Desktop.
HWT9053-485 (Witmotion) modbus protocol implementation in python. (not finished yet!)
import time
from pymodbus.client import ModbusSerialClient
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadDecoder
class SensorInterface:
def __init__(self, port: str, baudrate: int = 921600, device_address: int = 0x50):
self.port = port
self.baudrate = baudrate
self.device_address = device_address
self.client = ModbusSerialClient(
method='rtu',
port=port,
baudrate=baudrate,
timeout=0.001,
parity='N',
stopbits=1,
bytesize=8
)
print("Connected: ", self.client.connect())
def get_features(self):
read = self.client.read_holding_registers(
address=0x30,
count=0x29,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
data = {
"system_time": decoded.decode_16bit_int(), # 0x30
"system_chip": decoded.decode_16bit_int(), # 0x31
"unknown_0": decoded.decode_16bit_int(), # 0x32
"relative": decoded.decode_16bit_int(), # 0x33
"acceleration_x": decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81, # 0x34
"acceleration_y": decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81, # 0x35
"acceleration_z": decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81, # 0x36
"angular_velocity_x": decoded.decode_16bit_int() / 32768.0 * 2000.0, # 0x37
"angular_velocity_y": decoded.decode_16bit_int() / 32768.0 * 2000.0, # 0x38
"angular_velocity_z": decoded.decode_16bit_int() / 32768.0 * 2000.0, # 0x39
"magnetic_field_x": decoded.decode_16bit_int(), # 0x3A
"magnetic_field_y": decoded.decode_16bit_int(), # 0x3B
"magnetic_field_z": decoded.decode_16bit_int(), # 0x3C
"l_roll": decoded.decode_16bit_int(), # 0x3D
"h_roll": decoded.decode_16bit_int(), # 0x3E
"l_pitch": decoded.decode_16bit_int(), # 0x3F
"h_pitch": decoded.decode_16bit_int(), # 0x40
"l_yaw": decoded.decode_16bit_int(), # 0x41
"h_yaw": decoded.decode_16bit_int(), # 0x42
"temperatur": decoded.decode_16bit_int() / 100.0, # 0x43
"unknown_1": decoded.decode_16bit_int(), # 0x44
"unknown_2": decoded.decode_16bit_int(), # 0x45
"unknown_3": decoded.decode_16bit_int(), # 0x46
"unknown_4": decoded.decode_16bit_int(), # 0x47
"unknown_5": decoded.decode_16bit_int(), # 0x48
"unknown_6": decoded.decode_16bit_int(), # 0x49
"unknown_7": decoded.decode_16bit_int(), # 0x4A
"unknown_8": decoded.decode_16bit_int(), # 0x4B
"unknown_9": decoded.decode_16bit_int(), # 0x4C
"unknown_10": decoded.decode_16bit_int(), # 0x4D
"unknown_11": decoded.decode_16bit_int(), # 0x4E
"unknown_12": decoded.decode_16bit_int(), # 0x4F
"unknown_13": decoded.decode_16bit_int(), # 0x50
"quaternion_0": decoded.decode_16bit_int() / 32768.0, # 0x51
"quaternion_1": decoded.decode_16bit_int() / 32768.0, # 0x52
"quaternion_2": decoded.decode_16bit_int() / 32768.0, # 0x53
"quaternion_3": decoded.decode_16bit_int() / 32768.0, # 0x54
"unknown_14": decoded.decode_16bit_int(), # 0x55
"unknown_15": decoded.decode_16bit_int(), # 0x56
"unknown_16": decoded.decode_16bit_int(), # 0x57
"unknown_17": decoded.decode_16bit_int(), # 0x58
}
data["roll"] = ((data["h_roll"] << 16) | data["l_roll"]) / 1000.0
data["pitch"] = ((data["h_pitch"] << 16) | data["l_pitch"]) / 1000.0
data["yaw"] = ((data["h_yaw"] << 16) | data["l_yaw"]) / 1000.0
# Remove all unused values
del data["system_time"]
del data["system_chip"]
del data["relative"]
del data["l_roll"]
del data["h_roll"]
del data["l_pitch"]
del data["h_pitch"]
del data["l_yaw"]
del data["h_yaw"]
# Remove all unknown values
for key in list(data.keys()):
if key.startswith("unknown_"):
del data[key]
return data
def get_frequency(self):
read = self.client.read_holding_registers(
address=0x1F,
count=1,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
return decoded.decode_16bit_int()
def get_baudrate(self):
read = self.client.read_holding_registers(
address=0x04,
count=1,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
return decoded.decode_16bit_int()
def set_baudrate(self, baudrate_idx: int):
"""
Set the baudrate of the IMU
Values:
0x01: 4800bps
0x02: 9600bps
0x03: 19200bps
0x04: 38400bps
0x05: 57600bps
0x06: 115200bps
0x07: 230400bps
0x08: 460800bps
0x09: 921600bps
:param baudrate_idx:
:return:
"""
self.client.write_register(
address=0x69,
value=0xB588,
slave=self.device_address,
)
self.client.write_register(
address=0x04,
value=baudrate_idx,
slave=self.device_address,
)
self.client.write_register(
address=0x00,
value=0x00,
slave=self.device_address,
)
def set_frequency(self, frequency_idx: int):
"""
Set the frequency of the IMU
Values:
0x00: 256Hz
0x01: 188Hz
0x02: 98Hz
0x03: 42Hz
0x04: 20Hz
0x05: 10Hz
0x06: 5Hz
:param frequency_idx:
:return:
"""
self.client.write_register(
address=0x69,
value=0xB588,
slave=self.device_address,
)
self.client.write_register(
address=0x1F,
value=frequency_idx,
slave=self.device_address,
)
self.client.write_register(
address=0x00,
value=0x00,
slave=self.device_address,
)
def reboot(self):
self.client.write_register(
address=0x69,
value=0xB588,
slave=self.device_address,
)
self.client.write_register(
address=0x00,
value=0x00FF,
slave=self.device_address,
)
def get_acceleration(self):
read = self.client.read_holding_registers(
address=0x34,
count=3,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
a_x = decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81
a_y = decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81
a_z = decoded.decode_16bit_int() / 32768.0 * 16.0 * 9.81
return a_x, a_y, a_z
def get_angular_velocity(self):
read = self.client.read_holding_registers(
address=0x37,
count=3,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
w_x = decoded.decode_16bit_int() / 32768.0 * 2000.0
w_y = decoded.decode_16bit_int() / 32768.0 * 2000.0
w_z = decoded.decode_16bit_int() / 32768.0 * 2000.0
return w_x, w_y, w_z
def get_magnetic_field(self):
# TODO: Getting still the wrong values
read = self.client.read_holding_registers(
address=0x3A,
count=3,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
m_x = decoded.decode_16bit_int()
m_y = decoded.decode_16bit_int()
m_z = decoded.decode_16bit_int()
return m_x, m_y, m_z
def get_angle_output(self):
read = self.client.read_holding_registers(
address=0x3D,
count=6,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
l_roll = decoded.decode_16bit_int()
h_roll = decoded.decode_16bit_int()
roll = ((h_roll << 16) | l_roll) / 1000.0
l_pitch = decoded.decode_16bit_int()
h_pitch = decoded.decode_16bit_int()
pitch = ((h_pitch << 16) | l_pitch) / 1000.0
l_yaw = decoded.decode_16bit_int()
h_yaw = decoded.decode_16bit_int()
yaw = ((h_yaw << 16) | l_yaw) / 1000.0
return roll, pitch, yaw
def get_quaternion(self):
read = self.client.read_holding_registers(
address=0x51,
count=4,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
q0 = decoded.decode_16bit_int() / 32768.0
q1 = decoded.decode_16bit_int() / 32768.0
q2 = decoded.decode_16bit_int() / 32768.0
q3 = decoded.decode_16bit_int() / 32768.0
return q0, q1, q2, q3
def get_temperature(self):
read = self.client.read_holding_registers(
address=0x43,
count=1,
slave=self.device_address,
)
if not read.isError():
decoded = BinaryPayloadDecoder.fromRegisters(
read.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
return decoded.decode_16bit_int() / 100.0
if __name__ == '__main__':
sensor = SensorInterface(
port="COM4",
baudrate=460800,
device_address=0x50
)
while True:
print(sensor.get_angle_output())
time.sleep(0.5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment