Skip to content

Instantly share code, notes, and snippets.

@LimHyungTae
Created July 7, 2022 07:21
Show Gist options
  • Save LimHyungTae/70ab85665e32d637fb484634fa978948 to your computer and use it in GitHub Desktop.
Save LimHyungTae/70ab85665e32d637fb484634fa978948 to your computer and use it in GitHub Desktop.
ROS Jackal Telekeyop in Window
from __future__ import print_function
import threading
import rospy
from geometry_msgs.msg import Twist
from geometry_msgs.msg import TwistStamped
import signal
import sys
import keyboard
from select import select
if sys.platform == 'win32':
import msvcrt
else:
import termios
import tty
W_KEY = 119
S_KEY = 115
A_KEY = 97
D_KEY = 100
Q_KEY = 113
SPACE_KEY = 32
TwistMsg = Twist
msg = """
\033[1;32m* 'w'/'s' keys : Update linear velocity incre/decre.
* 'a'/'d' keys : Update angular velocity incre/decre.
* Space Bar : Reset linear/angular velocities.
* 'q' : Quit.\033[0m
"""
TRANS_VEL = 0
ROT_VEL = 0
def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
def vels(speed, turn):
return "current:\tTrans. speed %s\tRot. speed %s " % (speed, turn)
class PublishThread(threading.Thread):
def __init__(self, rate):
super(PublishThread, self).__init__()
self.publisher = rospy.Publisher('cmd_vel', TwistMsg, queue_size = 1)
self.x = 0.0
self.yaw = 0.0
self.condition = threading.Condition()
self.done = False
# Set timeout to None if rate is 0 (causes new_message to wait forever
# for new data to publish)
if rate != 0.0:
self.timeout = 1.0 / rate
else:
self.timeout = None
self.start()
def wait_for_subscribers(self):
i = 0
while not rospy.is_shutdown() and self.publisher.get_num_connections() == 0:
if i == 4:
print("Waiting for subscriber to connect to {}".format(self.publisher.name))
rospy.sleep(0.5)
i += 1
i = i % 5
if rospy.is_shutdown():
raise Exception("Got shutdown request before subscribers connected")
def update(self, trans_vel, rot_vel):
self.x = trans_vel
self.yaw = rot_vel
# Notify publish thread that we have a new message.
# self.condition.notify()
# self.condition.release()
def stop(self):
self.done = True
self.update(0, 0)
self.join()
def run(self):
twist_msg = TwistMsg()
if stamped:
twist = twist_msg.twist
twist_msg.header.stamp = rospy.Time.now()
twist_msg.header.frame_id = twist_frame
else:
twist = twist_msg
while not self.done:
if stamped:
twist_msg.header.stamp = rospy.Time.now()
self.condition.acquire()
# Wait for a new message or timeout.
self.condition.wait(self.timeout)
# Copy state into twist message.
twist.linear.x = self.x
twist.linear.y = 0
twist.linear.z = 0
twist.angular.x = 0
twist.angular.y = 0
twist.angular.z = self.yaw
self.condition.release()
# Publish.
# print(twist.linear.x, " / ", twist.angular.z)
self.publisher.publish(twist_msg)
# Publish stop message when thread exits.
twist.linear.x = 0
twist.linear.y = 0
twist.linear.z = 0
twist.angular.x = 0
twist.angular.y = 0
twist.angular.z = 0
self.publisher.publish(twist_msg)
def getKey(settings, timeout):
if sys.platform == 'win32':
# getwch() returns a string on Windows
key = msvcrt.getwch()
else:
tty.setraw(sys.stdin.fileno())
# sys.stdin.read() returns a string on Linux
rlist, _, _ = select([sys.stdin], [], [], timeout)
if rlist:
key = sys.stdin.read(1)
else:
key = ''
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
return key
def saveTerminalSettings():
if sys.platform == 'win32':
return None
return termios.tcgetattr(sys.stdin)
def restoreTerminalSettings(old_settings):
if sys.platform == 'win32':
return
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def cutoff_velocity(value, max_abs_value):
cutoff_value = value
if (value > max_abs_value):
cutoff_value = max_abs_value
elif (value < -max_abs_value):
cutoff_value = -max_abs_value
return cutoff_value
if __name__=="__main__":
settings = saveTerminalSettings()
rospy.init_node('teleop_twist_keyboard')
trans_vel_incre = rospy.get_param("~trans_vel_incre", 0.1)
trans_vel_max = rospy.get_param("~trans_vel_max", 1.5)
rot_vel_incre = rospy.get_param("~rot_vel_incre", 0.02)
rot_vel_max = rospy.get_param("~rot_vel_max", 1.0)
repeat = rospy.get_param("~repeat_rate", 10.0)
key_timeout = rospy.get_param("~key_timeout", 0.1)
stamped = rospy.get_param("~stamped", False)
twist_frame = rospy.get_param("~frame_id", '')
if stamped:
TwistMsg = TwistStamped
pub_thread = PublishThread(repeat)
TRANS_VEL = 0
ROT_VEL = 0
print(msg)
try:
# pub_thread.wait_for_subscribers()
pub_thread.update(TRANS_VEL, ROT_VEL)
print("HEY?")
print(msg)
print(vels(TRANS_VEL, ROT_VEL))
while(1):
signal.signal(signal.SIGINT, signal_handler)
key = getKey(settings, key_timeout)
ts_vel_incremental = 0.0
angular_vel_incremental = 0.0
if (len(key) != 0):
print(ord(key))
if ord(key) == W_KEY:
print("\033[1;32m 'w' key is pressed \033[0m")
TRANS_VEL += trans_vel_incre
elif ord(key) == S_KEY:
print("\033[1;32m 's' key is pressed \033[0m")
TRANS_VEL -= trans_vel_incre
elif ord(key) == A_KEY:
print("\033[1;32m 'a' key is pressed \033[0m")
ROT_VEL += rot_vel_incre
elif ord(key) == D_KEY:
print("\033[1;32m 'd' key is pressed \033[0m")
ROT_VEL -= rot_vel_incre
elif ord(key) == SPACE_KEY:
print("Stop!")
TRANS_VEL = 0
ROT_VEL = 0
elif ord(key) == Q_KEY:
print("Exit!")
break
# Cut off values for the safety issue
TRANS_VEL = cutoff_velocity(TRANS_VEL, trans_vel_max)
ROT_VEL = cutoff_velocity(ROT_VEL, rot_vel_max)
pub_thread.update(TRANS_VEL, ROT_VEL)
except Exception as e:
print(e)
finally:
pub_thread.stop()
restoreTerminalSettings(settings)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment