Skip to content

Instantly share code, notes, and snippets.

@miettal
Created September 24, 2018 06:11
Show Gist options
  • Save miettal/18b3d092133b5d30ae5e7cb04ca532f4 to your computer and use it in GitHub Desktop.
Save miettal/18b3d092133b5d30ae5e7cb04ca532f4 to your computer and use it in GitHub Desktop.
import curses
import locale
import time
import threading
from pykobuki.kobuki import Kobuki, FeedbackListener
locale.setlocale(locale.LC_ALL, "")
state = "STOP"
angle = 0.0
def send_thread() :
global state
speed_min = 0.0
state_ = "STOP"
speed = speed_min
while True :
if state_ == "STOP" and state_ != state :
state_ = state
speed = speed_min
if state_ == state:
if speed < 1.0 : speed += 0.02
else :
if speed_min < speed : speed -= 0.02
else :
state_ = "STOP"
speed = speed_min
if state_ == "FORWARD" :
kobuki1.base_control(int(700.0*speed), 0)
elif state_ == "BACKYARD" :
kobuki1.base_control(0xffff-int(700.0*speed), 0)
elif state_ == "LEFT" :
kobuki1.base_control(int(((3.14)*speed * 230.0)/2.0), 1)
elif state_ == "RIGHT" :
kobuki1.base_control(int(((3.14)*speed * 230.0)/2.0), -1)
else :
kobuki1.base_control(0, 0)
time.sleep(0.01)
def stop_thread() :
global state
last_change_time = time.time()
old_state = state
while True :
if old_state != state :
old_state = state
last_change_time = time.time()
if time.time() - last_change_time > 0.1 :
state = "STOP"
time.sleep(0.01)
def loop(stdscr, *args, **kwds) :
global state, angle
stdscr.nodelay(1)
while True :
stdscr.erase()
stdscr.addstr("Angle:{:+07.2f}".format(angle))
ch = stdscr.getch()
if ch == -1 :
pass
else :
if ch == curses.KEY_UP : state = "FORWARD"
elif ch == curses.KEY_DOWN : state = "BACKYARD"
elif ch == curses.KEY_RIGHT : state = "RIGHT"
elif ch == curses.KEY_LEFT : state = "LEFT"
else : pass
time.sleep(0.01)
class MyFeedbackListener(FeedbackListener) :
def onInertialSensor(self, angle_, angle_rate) :
global angle
angle = angle_
if __name__ == "__main__" :
kobuki1 = Kobuki('/dev/ttyUSB0')
kobuki1.feedback(MyFeedbackListener())
t2=threading.Thread(target=send_thread)
t2.setDaemon(True)
t2.start()
t2=threading.Thread(target=stop_thread)
t2.setDaemon(True)
t2.start()
curses.wrapper(loop)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment