Skip to content

Instantly share code, notes, and snippets.

@felixvd
Created August 6, 2020 12:09
Show Gist options
  • Save felixvd/d75932fc0dd337e31c9e817412b39389 to your computer and use it in GitHub Desktop.
Save felixvd/d75932fc0dd337e31c9e817412b39389 to your computer and use it in GitHub Desktop.
Debugging notes for Modbus RTU/TCP for Robotiq gripper
#!/usr/bin/env python
import os
import sys
import socket
from math import ceil
import time
import threading
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.register_read_message import ReadInputRegistersResponse
# Based on comModbusTcp.py and comModbusRtu.py in https://github.com/ros-industrial/robotiq
class ComModbus:
def __init__(self):
self.client = None
self.lock = threading.Lock()
def connectToDeviceTCP(self, address, port=''):
"""
Connection to the client - the method takes the IP address (as a string, e.g. '192.168.1.11') as an argument.
"""
if port:
self.client = ModbusTcpClient(address, port=port)
else:
self.client = ModbusTcpClient(address)
if not self.client.connect():
print("Unable to connect to " + address)
return False
def connectToDeviceSerial(self, device):
"""Connection to the client - the method takes the IP address (as a string, e.g. '192.168.1.11') as an argument."""
self.client = ModbusSerialClient(method='rtu',port=device,stopbits=1, bytesize=8, baudrate=115200, timeout=0.2)
if not self.client.connect():
print("Unable to connect to " + device)
return False
def disconnectFromDevice(self):
"""Close connection"""
self.client.close()
def sendCommand(self, data):
"""
Send a command to the Gripper - the method takes a list of uint8 as an argument.
The meaning of each variable depends on the Gripper model
"""
# make sure data has an even number of elements
if(len(data) % 2 == 1):
data.append(0)
# Initiate message as an empty list
message = []
# Fill message by combining two bytes in one register
for i in range(0, len(data)/2):
message.append((data[2*i] << 8) + data[2*i+1])
try:
with self.lock:
self.client.write_registers(0, message)
except Exception as e:
print("Encountered an exception when writing registers:")
print(e)
def getStatus(self, numBytes):
"""
Sends a request to read, wait for the response and returns the Gripper status.
The method gets the number of bytes to read as an argument
"""
numRegs = int(ceil(numBytes/2.0))
# Get status from the device
with self.lock:
response = self.client.read_input_registers(0, numRegs)
# Instantiate output as an empty list
output = []
# Fill the output with the bytes in the appropriate order
try:
for i in range(0, numRegs):
output.append((response.getRegister(i) & 0xFF00) >> 8)
output.append( response.getRegister(i) & 0x00FF)
except:
print("response for self.client.read_input_registers(0, numRegs) is not as expected: ")
print(response)
# Output the result
return output
class RobotiqCModel:
def __init__(self):
#Initiate output message as an empty list
self.message = []
def verifyCommand(self, command):
# Verify that each variable is in its correct range
command.rACT = max(0, command.rACT)
command.rACT = min(1, command.rACT)
command.rGTO = max(0, command.rGTO)
command.rGTO = min(1, command.rGTO)
command.rATR = max(0, command.rATR)
command.rATR = min(1, command.rATR)
command.rPR = max(0, command.rPR)
command.rPR = min(255, command.rPR)
command.rSP = max(0, command.rSP)
command.rSP = min(255, command.rSP)
command.rFR = max(0, command.rFR)
command.rFR = min(255, command.rFR)
# Return the cropped command
return command
def refreshCommand(self, command):
# Limit the value of each variable
command = self.verifyCommand(command)
# Initiate command as an empty list
self.message = []
# Build the command with each output variable
# TODO: add verification that all variables are in their authorized range
self.message.append(command.rACT + (command.rGTO << 3) + (command.rATR << 4))
self.message.append(0)
self.message.append(0)
self.message.append(command.rPR)
self.message.append(command.rSP)
self.message.append(command.rFR)
def sendCommand(self):
self.client.sendCommand(self.message)
def getStatus(self):
"""
Request the status from the gripper and return it in the CModelStatus msg type.
"""
#Acquire status from the Gripper
status = self.client.getStatus(6)
return status
# Message to report the gripper status
# message = CModelStatus()
# #Assign the values to their respective variables
# message.gACT = (status[0] >> 0) & 0x01;
# message.gGTO = (status[0] >> 3) & 0x01;
# message.gSTA = (status[0] >> 4) & 0x03;
# message.gOBJ = (status[0] >> 6) & 0x03;
# message.gFLT = status[2]
# message.gPR = status[3]
# message.gPO = status[4]
# message.gCU = status[5]
# return message
# ========
def mainLoop(address, port=''):
# Gripper is a C-Model with a TCP connection
print("Starting up")
gripper = RobotiqCModel()
gripper.client = ComModbus()
print("Connecting to TCP address " + str(address) + " and port " + str(port))
gripper.client.connectToDeviceTCP(address, port)
# print("Connecting to device " + str(address))
# gripper.client.connectToDeviceSerial(address)
while True:
# Get and publish the Gripper status
print("Getting status")
status = gripper.getStatus()
print("Got status:")
print(status)
# Wait a little
time.sleep(1)
# Send the most recent command
gripper.sendCommand()
# Wait a little
time.sleep(1)
if __name__ == '__main__':
# Verify user gave a legal IP address
try:
ip = sys.argv[1]
socket.inet_aton(ip)
except socket.error:
print('[cmodel_tcp_driver] Please provide a valid IP address')
# Run the main loop
try:
if sys.argv[2]:
mainLoop(sys.argv[1], sys.argv[2])
else:
mainLoop(sys.argv[1])
except Exception as e:
print('Received exception:')
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment