Skip to content

Instantly share code, notes, and snippets.

@rosterloh
Last active September 21, 2021 09:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rosterloh/7c15359aee3f221c792dada381507546 to your computer and use it in GitHub Desktop.
Save rosterloh/7c15359aee3f221c792dada381507546 to your computer and use it in GitHub Desktop.
Cli application to test dynamixel servos
import atexit
import argparse
from datetime import datetime, timedelta
from dynamixel_sdk import PortHandler
from dynamixel_sdk.robotis_def import COMM_SUCCESS
from dynamixel_sdk.packet_handler import PacketHandler
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import sys
from time import sleep, time
ADDR_OPERATING_MODE = 11
ADDR_TORQUE_ENABLE = 64
ADDR_PROFILE_VELOCITY = 112
ADDR_GOAL_POSITION = 116
ADDR_MOVING = 122
ADDR_CURRENT = 126
ADDR_PRESENT_POSITION = 132
ADDR_PRESENT_VOLTAGE = 144
ADDR_PRESENT_TEMPERATURE = 146
POSITION_CONTROL_MODE = 3
EXT_POSITION_CONTROL_MODE = 4
TORQUE_ENABLE = 1
TORQUE_DISABLE = 0
MAX_VELOCITY = 534 # 120 rev/min at 0.229 [rev/min]
class DynamixelTester:
def __init__(self, args):
self.init_logging(args)
self.devices = []
self.currents = []
self.voltages = []
self.temperatures = []
self.test_number = 0
if self.init_motors(args):
atexit.register(self.cleanup)
if args.test.upper() == "A":
if args.motor1 is None:
raise RuntimeError("No motor ID given for motor 1")
logging.info("Running test type A")
test_start = datetime.now()
test_end = test_start + timedelta(days=3)
while datetime.now() < test_end:
self.test_number += 1
self.full_rotation(self.devices[0])
self.get_test_stats()
logging.info(f'Test completed {self.test_number} iterations in {str(test_end - test_start)}')
elif args.test.upper() == "B":
if args.motor2 is None:
raise RuntimeError("No motor ID given for motor 2")
logging.info("Running test type B")
test_start = datetime.now()
test_end = test_start + timedelta(days=3)
if len(self.devices) == 2:
mid = self.devices[1]
else:
mid = args.motor2
while datetime.now() < test_end:
self.test_number += 1
self.full_rotation(mid)
self.get_test_stats()
logging.info(f'Test completed {self.test_number} iterations in {str(test_end - test_start)}')
elif args.test.upper() == "C":
if args.motor1 is None:
raise RuntimeError("No motor ID given for motor 1")
if args.motor2 is None:
raise RuntimeError("No motor ID given for motor 2")
logging.info("Running test type C")
test_start = datetime.now()
test_end = test_start + timedelta(days=3)
pan_id = args.motor1
tilt_id = args.motor2
pan_pose = self.degrees_to_position(10) # 10 <-> 350 degrees
tilt_pose = self.degrees_to_position(90) # 90 <-> -90 degrees
self.move(pan_id, pan_pose)
self.move(tilt_id, tilt_pose)
while self.is_moving(pan_id) or self.is_moving(tilt_id):
pass
while datetime.now() < test_end:
self.test_number += 1
start_time = time()
for i in range(40):
if i < 20:
pan_pose += self.degrees_to_position(17)
tilt_pose -= self.degrees_to_position(9)
else:
pan_pose -= self.degrees_to_position(17)
tilt_pose += self.degrees_to_position(9)
self.move(pan_id, pan_pose)
self.move(tilt_id, tilt_pose)
while self.is_moving(pan_id) or self.is_moving(tilt_id):
self.get_status(pan_id)
self.get_status(tilt_id)
sleep(0.05)
logging.debug(f'Motors at {self.position_to_degrees(pan_pose)} {self.position_to_degrees(tilt_pose)}')
logging.info(f'Test {self.test_number} took {time()-start_time:.2f} s')
self.get_test_stats()
logging.info(f'Test completed {self.test_number} iterations in {str(test_end - test_start)}')
else:
logging.error("Unknown test type")
def init_logging(self, args):
if not os.path.exists('logs'):
os.makedirs('logs', exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s.%(msecs)03d :: %(levelname)s :: %(message)s',
datefmt="%H:%M:%S"
)
fh = TimedRotatingFileHandler(f'logs/{args.device.replace("/dev/", "")}-Test{args.test}.log', when='D')
fh.setLevel(logging.DEBUG)
fh_format = logging.Formatter('%(asctime)s.%(msecs)d :: %(levelname)s :: %(name)s :: %(message)s :: %(lineno)d', datefmt="%H:%M:%S")
fh.setFormatter(fh_format)
logging.getLogger().addHandler(fh)
def init_motors(self, args):
try:
self.portHandler = PortHandler(args.device)
self.packetHandler = PacketHandler(2.0)
if not self.portHandler.openPort():
raise RuntimeError('Unable to communicate with serial device')
if not self.portHandler.setBaudRate(57600):
raise RuntimeError('Unable to set baudrate')
for m in [args.motor1, args.motor2]:
if m is not None:
_, result, err = self.packetHandler.ping(self.portHandler, m)
if result != COMM_SUCCESS or err != 0:
logging.error(f'Failed to find motor with ID {m}. [{result} {err}]')
self.scan_for_motors()
return False
else:
result, err = self.packetHandler.write1ByteTxRx(self.portHandler, m, ADDR_TORQUE_ENABLE, TORQUE_DISABLE)
self.check_packet(result, err)
# Enable motor
result, err = self.packetHandler.write1ByteTxRx(self.portHandler, m, ADDR_OPERATING_MODE, EXT_POSITION_CONTROL_MODE)
if not self.check_packet(result, err):
raise RuntimeError('Could not set opperating mode')
# Set Profile Velocity to 120 rev/min
result, err = self.packetHandler.write4ByteTxRx(self.portHandler, m, ADDR_PROFILE_VELOCITY, MAX_VELOCITY)
if not self.check_packet(result, err):
raise RuntimeError('Could not set opperating mode')
result, err = self.packetHandler.write1ByteTxRx(self.portHandler, m, ADDR_TORQUE_ENABLE, TORQUE_ENABLE)
if not self.check_packet(result, err):
raise RuntimeError('Could not enable motor torque')
position, result, err = self.packetHandler.read4ByteTxRx(self.portHandler, m, ADDR_PRESENT_POSITION)
if not self.check_packet(result, err):
raise RuntimeError('Could not read motor position')
else:
logging.info(f'Motor with ID {m} is ready at position {position}')
self.devices.append(m)
except Exception as e:
logging.error(f'Failed to init motor connection. {e}')
return False
return True
def scan_for_motors(self):
logging.info('Scanning bus for motors')
device_ids = []
for i in range(0, 252):
id, result, err = self.packetHandler.ping(self.portHandler, i)
if result == COMM_SUCCESS and err == 0:
if id == 1190:
mtype = f'XL330-M077 ({id})'
elif id == 1200:
mtype = f'XL330-M288 ({id})'
else:
mtype = f'Unknown ({id})'
logging.info(f'Motor found at ID {i} of type {mtype}')
device_ids.append(i)
if len(device_ids) == 0:
logging.warning('No devices found')
def check_packet(self, dxl_comm_result, dxl_error):
if dxl_comm_result != COMM_SUCCESS:
logging.error(f'{self.packetHandler.getTxRxResult(dxl_comm_result)}')
return False
elif dxl_error != 0:
logging.error(f'{self.packetHandler.getRxPacketError(dxl_error)}')
return False
return True
def cleanup(self):
for m in self.devices:
result, err = self.packetHandler.write1ByteTxRx(self.portHandler, m, ADDR_TORQUE_ENABLE, TORQUE_DISABLE)
self.check_packet(result, err)
self.portHandler.closePort()
def position_to_degrees(self, position):
return round((position / 4096) * 360, 2)
def degrees_to_position(self, angle):
return round((angle / 360) * 4096)
def move(self, id, position):
logging.debug(f'Moving {id} to {self.position_to_degrees(position)} degrees')
result, err = self.packetHandler.write4ByteTxRx(self.portHandler, id, ADDR_GOAL_POSITION, position)
return self.check_packet(result, err)
def get_position(self, id):
position, result, err = self.packetHandler.read4ByteTxRx(self.portHandler, id, ADDR_PRESENT_POSITION)
if not self.check_packet(result, err):
return -1
else:
return position
def is_moving(self, id):
moving, result, err = self.packetHandler.read1ByteTxRx(self.portHandler, id, ADDR_MOVING)
if not self.check_packet(result, err):
return False
else:
return moving == 1
# TODO: Convert this to a GroupRead
def get_status(self, id):
# Current value is between +/- Current Limit(38) default 1750
current, result, err = self.packetHandler.read2ByteTxRx(self.portHandler, id, ADDR_CURRENT)
if self.check_packet(result, err):
if current > 1750:
current -= 0xFFFF
self.currents.append(current)
# Voltage is between Min Voltage Limit(32) and Max Voltage Limit(34) in units of 0.1 [V]
volts, result, err = self.packetHandler.read2ByteTxRx(self.portHandler, id, ADDR_PRESENT_VOLTAGE)
if self.check_packet(result, err):
self.voltages.append(volts)
temp, result, err = self.packetHandler.read1ByteTxRx(self.portHandler, id, ADDR_PRESENT_TEMPERATURE)
if self.check_packet(result, err):
self.temperatures.append(temp)
def full_rotation(self, mid):
for sp in [4095, 0]:
start_time = time()
self.move(mid, sp)
sleep(0.05)
while (self.is_moving(mid)):
self.get_status(mid)
sleep(0.1)
logging.debug(f'Move took {int((time()-start_time)*1e3)} ms')
sleep(0.1)
def get_test_stats(self):
if len(self.currents) > 0:
abs_i = [abs(i) for i in self.currents]
max_i = max([max(self.currents), min(abs_i)])
mean_i = sum(abs_i) / len(self.currents)
logging.info(f'CURRENT: max: {max_i:.2f} mean {mean_i:.2f}')
if len(self.voltages) > 0:
max_v = max(self.voltages)
mean_v = sum(self.voltages) / len(self.voltages)
logging.info(f'VOLTAGE: max: {max_v/10:.2f} mean {mean_v/10:.2f}')
if len(self.temperatures) > 0:
max_t = max(self.temperatures)
mean_t = sum(self.temperatures) / len(self.temperatures)
logging.info(f'TEMPERATURE: max {max_t:.1f} mean {mean_t:.1f}')
self.currents.clear()
self.voltages.clear()
self.temperatures.clear()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Gardin Dynamixel Tester")
parser.add_argument("-D", "--device", help="Serial device", required=True)
parser.add_argument("-m1", "--motor1", type=int, help="Pan Motor ID")
parser.add_argument("-m2", "--motor2", type=int, help="Tilt Motor ID")
parser.add_argument("-t", "--test", help="Test to run", default="A")
args = parser.parse_args()
try:
test = DynamixelTester(args)
except Exception as e:
logging.error(f'Exception event... error is: {e}')
except KeyboardInterrupt:
logging.info("Terminating script")
finally:
logging.info("Script Exit")
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment