Skip to content

Instantly share code, notes, and snippets.

@hashb
Last active July 30, 2019 22:13
Show Gist options
  • Save hashb/e3058bf39f8a5f1753e9aa425b719f30 to your computer and use it in GitHub Desktop.
Save hashb/e3058bf39f8a5f1753e9aa425b719f30 to your computer and use it in GitHub Desktop.
test ROS simple_message
#!/usr/bin/python
import struct
class SimpleMessage(object):
COMM_TOPIC = 1
COMM_REQUEST = 2
COMM_REPLY = 3
REPLY_NA = 0
REPLY_SUCCESS = 1
REPLY_FAILURE = 2
REPLY_EXECUTING = 3
NUM_JOINTS = 10
'''
Joint trajectory point request:
int length; /* 4 bytes, constant value should be 16x4 = 64 */
int message_type; /* 4 bytes, constant value 11, JOINT TRAJ_PT */
int comm_type; /* 4 bytes, constant value 2, REQUEST */
int reply_type; /* 4 bytes, N/A */
int seq_number; /* 4 bytes, >= 0 */
float joints[JOINT_MAX]; /* 10 4-byte floats, one per joint */
float velocity; /* 4 bytes */
float duration; /* 4 bytes */
'''
class JointTrajPtRequest(object):
def setSeqNumber(self, seq_number):
self.seq_number = seq_number
def setJoints(self, Joints):
jts = len(Joints)
if jts < SimpleMessage.NUM_JOINTS:
self.joints = Joints
self.joints += [0] * (SimpleMessage.NUM_JOINTS - jts)
else:
self.joints = Joints[0:SimpleMessage.NUM_JOINTS]
def setVelocity(self, Velocity):
self.velocity = Velocity
def setDuration(self, Duration):
self.duration = Duration
def setCmdType(self, cmd_type):
self.cmd_type = cmd_type
def __init__(self, Joints):
self.seq_number = 1
self.setJoints(Joints)
self.cmd_type = 1
self.velocity = 1
self.duration = 1
def __str__(self):
return str(self.seq_number) + ", " + str(self.joints) + \
", " + str(self.velocity) + ", " + str(self.duration)
def pack(self):
self.seq_number += 1
st = struct.pack('>iiiii', 64, 11, self.cmd_type, 0, self.seq_number)
st += struct.pack('>' + SimpleMessage.NUM_JOINTS * 'f', *self.joints)
st += struct.pack('>ff', self.velocity, self.duration)
return st
def unpack(self, packed):
try:
unpacked = struct.unpack(
'iiiii' + SimpleMessage.NUM_JOINTS * 'f' + 'ff', packed)
if unpacked[1] != 11:
return False
except BaseException:
return False
self.setSeqNumber(unpacked[4])
self.setJoints(unpacked[5:5 + SimpleMessage.NUM_JOINTS])
self.setVelocity(unpacked[5 + SimpleMessage.NUM_JOINTS])
self.setDuration(unpacked[5 + SimpleMessage.NUM_JOINTS + 1])
return True
'''
Joint trajectory point reply:
int length; /* 4 bytes, constant value should be 14x4 = 56 */
int message_type; /* 4 bytes, constant value 11, JOINT_TRAJ_PT */
int comm_type; /* 4 bytes, constant value 3, REPLY */
int reply_type; /* 4 bytes, 1 = SUCCESS, 2 = FAILURE, 3 = EXEC */
int unused_1; /* 4 bytes, N/A */
float unused_2[JOINT_MAX]; /* 10 4-byte floats, N/A */
'''
class JointTrajPtReply(object):
def setStatus(self, status):
self.status = status
def getStatus(self):
return self.status
# this corresponds to the 'unused_1' field
def setSeqNumber(self, seq_number):
self.seq_number = seq_number
def getSeqNumber(self):
return self.seq_number
def __init__(self, status=SimpleMessage.REPLY_SUCCESS):
# 1 = SUCCESS, 2 = FAILURE
self.setStatus(status)
self.setSeqNumber(1)
def __str__(self):
return str(self.status) + " " + str(self.seq_number)
def pack(self):
self.seq_number += 1
st = struct.pack('iiiii', 56, 11, 3, self.status, self.seq_number)
st += struct.pack(SimpleMessage.NUM_JOINTS * 'f',
*SimpleMessage.NUM_JOINTS * [0])
return st
def unpack(self, packed):
try:
unpacked = struct.unpack(
'iiiii' + SimpleMessage.NUM_JOINTS * 'f', packed)
if unpacked[1] != 11:
return False
except BaseException:
return False
self.setStatus(unpacked[3])
self.setSeqNumber(unpacked[4])
return True
'''
Cartesian trajectory point request:
int length; /* 4 bytes, constant value should be 13x4 = 52 */
int message_type; /* 4 bytes, constant value 31, CART TRAJ_PT */
int comm_type; /* 4 bytes, constant value 2, REQUEST */
int reply_type; /* 4 bytes, N/A */
int seq_number; /* 4 bytes, >= 0 */
float x, y, z, qx, qy, qz, qw; /* pose, Cartesian and quaternion */
float translational_speed; /* 4 bytes, > 0 */
float rotational_speed; /* 4 bytes, > 0 */
'''
class CartTrajPtRequest(object):
def setSeqNumber(self, seq_number):
self.seq_number = seq_number
def setPose(self, x, y, z, qx, qy, qz, qw):
self.x = x
self.y = y
self.z = z
self.qx = qx
self.qy = qy
self.qz = qz
self.qw = qw
def setTranslationalSpeed(self, Speed):
self.translationalSpeed = Speed
def setRotationalSpeed(self, Speed):
self.rotationalSpeed = Speed
def __init__(self, x, y, z, qx, qy, qz, qw):
self.seq_number = 1
self.setPose(x, y, z, qx, qy, qz, qw)
self.setTranslationalSpeed(1)
self.setRotationalSpeed(1)
def __str__(self):
outstr = ""
for el in self.x, self.y, self.z, self.qx, self.qy, self.qz, self.qw, self.translationalSpeed, self.rotationalSpeed:
outstr += (str(el) + " ")
return outstr
def pack(self):
self.seq_number += 1
st = struct.pack('iiiii', 52, 31, 2, 0, self.seq_number)
st += struct.pack('fffffff', self.x, self.y, self.z,
self.qx, self.qy, self.qz, self.qw)
st += struct.pack('ff', self.translationalSpeed, self.rotationalSpeed)
return st
def unpack(self, packed):
try:
unpacked = struct.unpack('iiiii' + 'fffffff' + 'ff', packed)
if unpacked[1] != 31:
return False
except BaseException:
return False
self.setSeqNumber(unpacked[4])
self.setPose(
unpacked[5],
unpacked[6],
unpacked[7],
unpacked[8],
unpacked[9],
unpacked[10],
unpacked[11])
self.setTranslationalSpeed(unpacked[12])
self.setRotationalSpeed(unpacked[13])
return True
'''
Cartesian trajectory point reply:
int length; /* 4 bytes, constant value should be 4x4 = 16 */
int message_type; /* 4 bytes, constant value 31, CART_TRAJ_PT */
int comm_type; /* 4 bytes, constant value 3, REPLY */
int reply_type; /* 4 bytes, 1 = SUCCESS, 2 = FAILURE, 3 = EXEC */
int seq_number; /* 4 bytes, sequence number echo */
'''
class CartTrajPtReply(object):
def setStatus(self, status):
self.status = status
def getStatus(self):
return self.status
def setSeqNumber(self, seq_number):
self.seq_number = seq_number
def __init__(self):
# 1 = SUCCESS, 2 = FAILURE
self.SUCCESS = 1
self.FAILURE = 2
self.EXEC = 3
self.status = self.SUCCESS
self.seq_number = 1
def __str__(self):
return str(self.status) + " " + str(self.seq_number)
def pack(self):
self.seq_number += 1
st = struct.pack('iiiii', 16, 31, 3, self.status, self.seq_number)
return st
def unpack(self, packed):
try:
unpacked = struct.unpack('iiiii', packed)
if unpacked[1] != 31:
return False
except BaseException:
return False
self.setStatus(unpacked[3])
self.setSeqNumber(unpacked[4])
return True
'''
Joint trajectory point state:
int length; /* 4 bytes, constant value should be 14x4 = 56 */
int message_type; /* 4 bytes, constant value 10, JOINT_TRAJ_PT */
int comm_type; /* 4 bytes, constant value 1, TOPIC */
int reply_type; /* 4 bytes, N/A */
int unused_1; /* 4 bytes, N/A */
float joints[JOINT_MAX]; /* 10 4-byte floats, N/A */
'''
class JointTrajPtState(object):
def setJoints(self, Joints):
jts = len(Joints)
if jts < SimpleMessage.NUM_JOINTS:
self.joints = Joints
self.joints += [0] * (SimpleMessage.NUM_JOINTS - jts)
else:
self.joints = Joints[0:SimpleMessage.NUM_JOINTS]
def __init__(self):
self.setJoints(SimpleMessage.NUM_JOINTS * [0])
def __str__(self):
return str(self.joints)
def pack(self):
st = struct.pack('iiiii', 56, 10, 1, 0, 0)
st += struct.pack(SimpleMessage.NUM_JOINTS * 'f', *self.joints)
return st
def unpack(self, packed):
try:
unpacked = struct.unpack(
'iiiii' + SimpleMessage.NUM_JOINTS * 'f', packed)
if unpacked[1] != 10:
return False
except BaseException:
return False
self.setJoints(unpacked[5:5 + SimpleMessage.NUM_JOINTS])
return True
'''
Robot status:
int length; /* 4 bytes, constant value should be 10x4 = 40 */
int message_type; /* 4 bytes, constant value 13, ROBOT_STATUS */
int comm_type; /* 4 bytes, constant value 1, TOPIC */
int reply_type; /* 4 bytes, N/A */
int drives_powered;
int e_stopped;
int error_code;
int in_error;
int in_motion;
int mode;
int motion_possible;
'''
class RobotStatus(object):
def __init__(self):
self.drives_powered = 0
self.e_stopped = 0
self.error_code = 0
self.in_error = 0
self.in_motion = 0
self.mode = 0
self.motion_possible = 0
def __str__(self):
return str(self.drives_powered) + " " + \
str(self.e_stopped) + " " + \
str(self.error_code) + " " + \
str(self.in_error) + " " + \
str(self.in_motion) + " " + \
str(self.mode) + " " + \
str(self.motion_possible)
def pack(self):
st = struct.pack(
'iiiiiiiiiii',
40,
13,
1,
0,
self.drives_powered,
self.e_stopped,
self.error_code,
self.in_error,
self.in_motion,
self.mode,
self.motion_possible)
return st
def unpack(self, packed):
try:
unpacked = struct.unpack('iiiiiiiiiii', packed)
if unpacked[1] != 13:
return False
except BaseException:
return False
self.drives_powered = unpacked[4]
self.e_stopped = unpacked[5]
self.error_code = unpacked[6]
self.in_error = unpacked[7]
self.in_motion = unpacked[8]
self.mode = unpacked[9]
self.motion_possible = unpacked[10]
return True
'''
Object information:
int id; /* unique object identifier */
float x, y, z; /* Cartesian position */
float qx, qy, qz, qw; /* quaternion orientation */
'''
class ObjectInfo(object):
def __init__(self, _id=0, _x=0, _y=0, _z=0, _qx=0, _qy=0, _qz=0, _qw=1):
self.id = _id
self.x = _x
self.y = _y
self.z = _z
self.qx = _qx
self.qy = _qy
self.qz = _qz
self.qw = _qw
def __str__(self):
return str(self.id) + ": {0} {1} {2}, {3} {4} {5} {6}".format(
self.x, self.y, self.z, self.qx, self.qy, self.qz, self.qw)
'''
Object state message:
int length; /* 4 bytes, = 16 + 32*N, number of objects */
int message_type; /* 4 bytes, constant value 40, MESSAGE_OBJECT_STATE */
int comm_type; /* 4 bytes, constant value 1, TOPIC */
int reply_type; /* 4 bytes, N/A */
int seq_number; /* 4 bytes, >= 0 */
object_state objects[]; /* array of object information */
'''
class ObjectState(object):
def __init__(self):
self.seq_number = 0
self.objects = []
def __str__(self):
outstr = ""
for obj in self.objects:
outstr += str(obj)
outstr += "\n"
return outstr
def add(self, obj):
self.objects.append(obj)
def clear(self):
self.objects = []
def pack(self):
st = struct.pack('iiiii', 16 + 32 * len(self.objects),
40, 1, 0, self.seq_number)
for obj in self.objects:
st += struct.pack('ifffffff', obj.id, obj.x, obj.y,
obj.z, obj.qx, obj.qy, obj.qz, obj.qw)
return st
def unpack(self, packed):
length = struct.unpack('i', packed[0:4])[0]
objs = (length - 16) / 32
unpacked = struct.unpack('iiiii' + objs * 'ifffffff', packed)
for i in range(0, objs):
off = 5 + i * 8
self.add(ObjectInfo(unpacked[off + 0],
unpacked[off + 1],
unpacked[off + 2],
unpacked[off + 3],
unpacked[off + 4],
unpacked[off + 5],
unpacked[off + 6],
unpacked[off + 7]))
return True
if __name__ == "__main__":
jt = JointTrajPtRequest([1, 2, 3])
print jt
njt = JointTrajPtRequest([])
if not njt.unpack(jt.pack()):
print "unpacking error"
else:
print njt
ct = CartTrajPtRequest(1, 2, 3, 0.7071, 0.0, 0.0, 0.7071)
print ct
nct = CartTrajPtRequest(0, 0, 0, 0, 0, 0, 1)
if not nct.unpack(ct.pack()):
print "unpacking error"
else:
print nct
if not nct.unpack(jt.pack()):
print "unpacking error as expected"
else:
print "unexpected unpacking success"
jtrep = JointTrajPtReply(SimpleMessage.REPLY_EXECUTING)
print jtrep
njtrep = JointTrajPtReply()
njtrep.unpack(jtrep.pack())
print njtrep
rs = RobotStatus()
rs.drives_powered = 1
rs.e_stopped = 2
rs.error_code = 3
rs.in_error = 4
rs.in_motion = 5
rs.mode = 6
rs.motion_possible = 7
print rs
nrs = RobotStatus()
if not nrs.unpack(rs.pack()):
print "unpacking error"
else:
print nrs
js = JointTrajPtState()
js.setJoints([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
print js
js.setJoints([101, 202, 303])
njs = JointTrajPtState()
njs.unpack(js.pack())
print njs
oi = ObjectInfo(17, 1, 2, 3, 1, 0, 0, 0)
os = ObjectState()
os.add(oi)
oi = ObjectInfo(23, 10, 23, 32, 0.7071, 0, 0, -0.7017)
os.add(oi)
print os
nos = ObjectState()
nos.unpack(os.pack())
print nos
import math
import time
import socket
from simple_message import *
class ROSComm():
"""docstring for ROSComm"""
def __init__(self, robot_ip="127.0.0.1"):
self.command = None
self.feedback = None
self.kCMDPort = 11000
self.kFDBPort = 11002
self.robot_ip = robot_ip
def connect(self):
self.command = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.command.connect((self.robot_ip, self.kCMDPort))
self.feedback = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.feedback.connect((self.robot_ip, self.kFDBPort))
def send_cmd(self, msg):
self.command.send(msg)
def streaming_cmd(self, payload):
pass
def downloading_cmd(self, payload):
pass
def cmd(self, payload, msg_type=42):
""" msg_type 42 for streaming
44 for downloading
"""
if msg_type == 42:
streaming_cmd(payload)
elif msg_type == 44:
downloading_cmd(payload)
else:
raise Exception("Unknown msg_type")
def get_fdb(self):
return self.feedback.recv(1024)
def deg2rad(data):
"""Convert degrees to radians
"""
return map(math.radians, data)
if __name__ == '__main__':
# comm = ROSComm()
comm = ROSComm("192.168.5.24")
comm.connect()
# comm.send_cmd()
traj = [[-68.000, -40.000, 210.000, -15.000, 20.000, 30.000],
[-54.957, -60.518, 221.253, -15.000, 12.888, 30.000],
[-17.428, -71.758, 238.594, -15.000, 13.577, 30.000],
[20.100, -70.936, 238.511, -15.000, 14.266, 30.000],
[57.628, -55.583, 220.756, -15.000, 14.955, 30.000],
[65.000, -40.000, 215.000, -15.000, 20.000, 30.000]]
msgs = [JointTrajPtRequest(deg2rad(jts)) for jts in traj]
# print comm.get_fdb()
# exit()
for i, jts in enumerate(msgs):
jts.setSeqNumber(i + 1)
print jts
comm.send_cmd(jts.pack())
time.sleep(1)
jts_end = JointTrajPtRequest(traj[-1])
jts_end.setSeqNumber(-4)
comm.send_cmd(jts_end.pack())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment