Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Code for this balancing robot: https://www.youtube.com/watch?v=YyU3X6ZazJw . A PID controller was used, based on IMU data from a Kalman filter. The program is written in python and multithreaded.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math
import os
import sys
import time, datetime
from multiprocessing import Process, Value, Queue
from OSC import OSCServer
from roboclaw import Roboclaw
import RTIMU
sys.path.append("/home/pi/Adafruit_Python_GPIO")
sys.path.append('.')
sys.path.append("/usr/lib/python3")
sys.path.append("/usr/lib/python2.7/dist-packages")
# Cette partie déclare les coefficients des régulateurs
delta_t = 0.001
tt = 1000 * delta_t
cp = 20
cd = 0.2
ci = 0
cp = 16.0
cd = 0.15
ci = 0
pp = 0.01
pd = 0.08
pp = pp / 10
pd = pd / 10
first = 0
inity = 0
initx = 0
# La fonction qui lit les signaux de la télécommande
def remote(run, anglex, angley, zeroPos, pos, RunningProcesses, motors_on):
RunningProcesses.put(os.getpid())
# Création d'un objet capable de lire ces signaux
server = OSCServer(("192.168.88.251", 8000)) # Adresse IP du Raspberry Pi sur le réseau.
first = 0
inity = 0
initx = 0
# Comment réagir aux différents boutons
def remoteOn(path, tags, args, source):
global run, first
print("run!")
run.value = 1
first = 0
def RemoteOff(path, tags, args, source):
global run
print("stop!")
run.value = 0
zeroPos.value += pos.value
def Motors_on_remote(path, tags, args, source):
global motors_on
motors_on.value = 1 - motors_on.value
def accel(path, tags, args, source):
global first, initx, inity, anglex, angley
y = float(args[0])
x = float(args[1])
z = float(args[2])
R = math.sqrt(x ** 2 + y ** 2 + z ** 2)
Arx = math.acos(x / R) * 180 / math.pi - initx
Ary = math.acos(y / R) * 180 / math.pi - inity
if first == 0:
inity += Ary
initx += Arx
first = 1
Arx = 0
Ary = 0
time.sleep(0.01)
anglex.value = round(float(constrain(Arx, -30, 30)) / 60, 3)
angley.value = round(float(constrain(Ary, -45, 45)) / 90, 3)
server.addMsgHandler("/1/run", remoteOn)
server.addMsgHandler("/1/stop", RemoteOff)
server.addMsgHandler("/accxyz", accel)
server.addMsgHandler("/1/motors", Motors_on_remote)
# lecture régulière de la télécommande
while True:
try:
server.handle_request()
time.sleep(0.01)
except KeyboardInterrupt:
for i in RunningProcesses.get():
os.kill(i, 9)
server.close()
# lecture du capteur.
def ReadSensor(angle, gyro, RunningProcesses):
RunningProcesses.put(os.getpid())
SETTINGS_FILE = "RTIMULib"
print("Using settings file " + SETTINGS_FILE + ".ini")
if not os.path.exists(SETTINGS_FILE + ".ini"):
print("Settings file does not exist, will be created")
s = RTIMU.Settings(SETTINGS_FILE)
imu = RTIMU.RTIMU(s)
print("IMU Name: " + imu.IMUName())
if (not imu.IMUInit()):
print("IMU Init Failed")
sys.exit(1)
else:
pass
# this is a good time to set any fusion parameters
imu.setSlerpPower(0.02)
imu.setGyroEnable(True)
imu.setAccelEnable(True)
imu.setCompassEnable(True)
while 1:
try:
if imu.IMURead():
data = imu.getIMUData()
# les valeurs sont décalées de 0.7 degrés pour que l'angle soit 0 à l'équilibre.
measurementE = - 0.7 + math.degrees(data["fusionPose"][1]) # - angle_goal
measurementG = math.degrees(data["gyro"][2])
angle.value = measurementE
gyro.value = measurementG
time.sleep(0.001)
except KeyboardInterrupt:
for i in RunningProcesses.get():
os.kill(i, 9)
# programme principal
# s'occupe de joindre les différentes parties en régulant et communiquant avec les moteurs
def MainLoop(e, g, anglex, angley, run, zeroPos, pos, RunningProcesses, motors_on):
RunningProcesses.put(os.getpid())
f = open("Fusionlog.log", "w")
rc = Roboclaw("/dev/ttyACM0", 38400)
if rc.Open():
print("Roboclaw connected!")
else:
print("roboclaw not connected!")
exit()
address = 0x80
version = rc.ReadVersion(address)
if version[0] is False:
print("GETVERSION Failed")
else:
print(repr(version[1]))
rc.ForwardMixed(address, 0)
rc.TurnRightMixed(address, 0)
print("voltage" + "," + str(rc.ReadMainBatteryVoltage(address)[1] / 10 + 0.4))
zeroPos.value = 0
start_pos = (rc.ReadEncM1(address)[1] + rc.ReadEncM2(address)[1]) * 0.5
real_speed = 0
beginning = time.mktime(datetime.datetime.now().timetuple()) * 1e3 + datetime.datetime.now().microsecond / 1e3
now = beginning
totaldt = 0
angle = 0
gyro = 0
counter = 0
while True:
try:
counter += 1
now = datetime.datetime.now()
totaldt = time.mktime(now.timetuple()) * 1e3 + now.microsecond / 1e3 - beginning
# lecture des mesures
pos.value = (rc.ReadEncM1(address)[1] + rc.ReadEncM2(address)[1]) * 0.5 - start_pos - zeroPos.value
real_speed = (rc.ReadSpeedM1(address)[1] + rc.ReadSpeedM2(address)[1]) * 0.5
# premier régulateur PD. Une distinction est faite entre le cas avec ou sans télécommande.
if run.value == 0:
angle_goal = constrain(constrain(pp * (pos.value), -5, 5) + pd * real_speed, -6, 6)
else:
angle_goal = constrain((10000 * angley.value) * pp + pd * real_speed, -6, 6)
angle = e.value + angle_goal
gyro = g.value
# arrêter les moteurs si le robot est trop penché
if abs(angle) > 40 and totaldt > 1000 and motors_on.value == 1:
print("leaning too far")
rc.ForwardMixed(address, 0)
motors_on.value = 0
# déterminer la réaction des moteurs avec le deuxième régulateur PD
stemp = (cp * angle - cd * gyro)
stemp2 = constrain(stemp, -127, 127)
speed = motors_on.value * int(stemp2)
if speed >= 0:
rc.ForwardMixed(address, speed)
else:
rc.BackwardMixed(address, - speed)
if run.value == 1:
if anglex.value > 0:
rc.TurnRightMixed(address, int(anglex.value * 20))
else:
rc.TurnLeftMixed(address, int(- anglex.value * 20))
# Logging
print("values" + "," + str(round(angle, 3)) + "," + str(round(gyro, 3)) + "," + str(round(speed, 3)) + "," + str(round(constrain(pos.value, -5 / pp, 5 / pp), 3)) + "," + str(run.value) + "," + str(angley.value) + "," + str(anglex.value) + "," + "")
f.write("{" + str(totaldt) + "," + str(round(angle, 3)) + "," + str(round(gyro, 3)) + "," + str(round(speed, 3)) + "}\n")
# écriture dans un fichier
if counter > 9: # Les valeurs copiées 10 fois par seconde.
counter = 0
f.flush()
sys.stdout.flush()
except KeyboardInterrupt:
rc.ForwardMixed(address, 0)
for i in RunningProcesses.get():
os.kill(i, 9)
def constrain(val, min_val, max_val):
return min(max_val, max(min_val, val))
# s'occupe de démarrer les trois parties, et déclare les variables partagées.
if __name__ == '__main__':
RunningProcesses = Queue()
e = Value('d', 0.0)
g = Value('d', 0.0)
run = Value('i', 0)
angley = Value('d', 0.0)
anglex = Value('d', 0.0)
zeroPos = Value('d', 0.0)
pos = Value('d', 0.0)
motors_on = Value('i', 1)
p = Process(target=ReadSensor, args=(e, g, RunningProcesses))
q = Process(target=MainLoop, args=(e, g, anglex, angley, run, zeroPos, pos, RunningProcesses, motors_on))
rt = Process(target=remote, args=(run, anglex, angley, zeroPos, pos, RunningProcesses, motors_on))
p.start()
q.start()
rt.start()
@yannisulrich
Copy link
Author

yannisulrich commented Nov 4, 2018

https://www.youtube.com/watch?v=YyU3X6ZazJw for a video of the final result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment