Code for this balancing robot: https://www.youtube.com/watch?v=YyU3X6ZazJw . A PID controller was used, based on IMU data from a Kalman filter. The program is written in python and multithreaded.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import math | |
import os | |
import sys | |
import time, datetime | |
from multiprocessing import Process, Value, Queue | |
from OSC import OSCServer | |
from roboclaw import Roboclaw | |
import RTIMU | |
sys.path.append("/home/pi/Adafruit_Python_GPIO") | |
sys.path.append('.') | |
sys.path.append("/usr/lib/python3") | |
sys.path.append("/usr/lib/python2.7/dist-packages") | |
# Cette partie déclare les coefficients des régulateurs | |
delta_t = 0.001 | |
tt = 1000 * delta_t | |
cp = 20 | |
cd = 0.2 | |
ci = 0 | |
cp = 16.0 | |
cd = 0.15 | |
ci = 0 | |
pp = 0.01 | |
pd = 0.08 | |
pp = pp / 10 | |
pd = pd / 10 | |
first = 0 | |
inity = 0 | |
initx = 0 | |
# La fonction qui lit les signaux de la télécommande | |
def remote(run, anglex, angley, zeroPos, pos, RunningProcesses, motors_on): | |
RunningProcesses.put(os.getpid()) | |
# Création d'un objet capable de lire ces signaux | |
server = OSCServer(("192.168.88.251", 8000)) # Adresse IP du Raspberry Pi sur le réseau. | |
first = 0 | |
inity = 0 | |
initx = 0 | |
# Comment réagir aux différents boutons | |
def remoteOn(path, tags, args, source): | |
global run, first | |
print("run!") | |
run.value = 1 | |
first = 0 | |
def RemoteOff(path, tags, args, source): | |
global run | |
print("stop!") | |
run.value = 0 | |
zeroPos.value += pos.value | |
def Motors_on_remote(path, tags, args, source): | |
global motors_on | |
motors_on.value = 1 - motors_on.value | |
def accel(path, tags, args, source): | |
global first, initx, inity, anglex, angley | |
y = float(args[0]) | |
x = float(args[1]) | |
z = float(args[2]) | |
R = math.sqrt(x ** 2 + y ** 2 + z ** 2) | |
Arx = math.acos(x / R) * 180 / math.pi - initx | |
Ary = math.acos(y / R) * 180 / math.pi - inity | |
if first == 0: | |
inity += Ary | |
initx += Arx | |
first = 1 | |
Arx = 0 | |
Ary = 0 | |
time.sleep(0.01) | |
anglex.value = round(float(constrain(Arx, -30, 30)) / 60, 3) | |
angley.value = round(float(constrain(Ary, -45, 45)) / 90, 3) | |
server.addMsgHandler("/1/run", remoteOn) | |
server.addMsgHandler("/1/stop", RemoteOff) | |
server.addMsgHandler("/accxyz", accel) | |
server.addMsgHandler("/1/motors", Motors_on_remote) | |
# lecture régulière de la télécommande | |
while True: | |
try: | |
server.handle_request() | |
time.sleep(0.01) | |
except KeyboardInterrupt: | |
for i in RunningProcesses.get(): | |
os.kill(i, 9) | |
server.close() | |
# lecture du capteur. | |
def ReadSensor(angle, gyro, RunningProcesses): | |
RunningProcesses.put(os.getpid()) | |
SETTINGS_FILE = "RTIMULib" | |
print("Using settings file " + SETTINGS_FILE + ".ini") | |
if not os.path.exists(SETTINGS_FILE + ".ini"): | |
print("Settings file does not exist, will be created") | |
s = RTIMU.Settings(SETTINGS_FILE) | |
imu = RTIMU.RTIMU(s) | |
print("IMU Name: " + imu.IMUName()) | |
if (not imu.IMUInit()): | |
print("IMU Init Failed") | |
sys.exit(1) | |
else: | |
pass | |
# this is a good time to set any fusion parameters | |
imu.setSlerpPower(0.02) | |
imu.setGyroEnable(True) | |
imu.setAccelEnable(True) | |
imu.setCompassEnable(True) | |
while 1: | |
try: | |
if imu.IMURead(): | |
data = imu.getIMUData() | |
# les valeurs sont décalées de 0.7 degrés pour que l'angle soit 0 à l'équilibre. | |
measurementE = - 0.7 + math.degrees(data["fusionPose"][1]) # - angle_goal | |
measurementG = math.degrees(data["gyro"][2]) | |
angle.value = measurementE | |
gyro.value = measurementG | |
time.sleep(0.001) | |
except KeyboardInterrupt: | |
for i in RunningProcesses.get(): | |
os.kill(i, 9) | |
# programme principal | |
# s'occupe de joindre les différentes parties en régulant et communiquant avec les moteurs | |
def MainLoop(e, g, anglex, angley, run, zeroPos, pos, RunningProcesses, motors_on): | |
RunningProcesses.put(os.getpid()) | |
f = open("Fusionlog.log", "w") | |
rc = Roboclaw("/dev/ttyACM0", 38400) | |
if rc.Open(): | |
print("Roboclaw connected!") | |
else: | |
print("roboclaw not connected!") | |
exit() | |
address = 0x80 | |
version = rc.ReadVersion(address) | |
if version[0] is False: | |
print("GETVERSION Failed") | |
else: | |
print(repr(version[1])) | |
rc.ForwardMixed(address, 0) | |
rc.TurnRightMixed(address, 0) | |
print("voltage" + "," + str(rc.ReadMainBatteryVoltage(address)[1] / 10 + 0.4)) | |
zeroPos.value = 0 | |
start_pos = (rc.ReadEncM1(address)[1] + rc.ReadEncM2(address)[1]) * 0.5 | |
real_speed = 0 | |
beginning = time.mktime(datetime.datetime.now().timetuple()) * 1e3 + datetime.datetime.now().microsecond / 1e3 | |
now = beginning | |
totaldt = 0 | |
angle = 0 | |
gyro = 0 | |
counter = 0 | |
while True: | |
try: | |
counter += 1 | |
now = datetime.datetime.now() | |
totaldt = time.mktime(now.timetuple()) * 1e3 + now.microsecond / 1e3 - beginning | |
# lecture des mesures | |
pos.value = (rc.ReadEncM1(address)[1] + rc.ReadEncM2(address)[1]) * 0.5 - start_pos - zeroPos.value | |
real_speed = (rc.ReadSpeedM1(address)[1] + rc.ReadSpeedM2(address)[1]) * 0.5 | |
# premier régulateur PD. Une distinction est faite entre le cas avec ou sans télécommande. | |
if run.value == 0: | |
angle_goal = constrain(constrain(pp * (pos.value), -5, 5) + pd * real_speed, -6, 6) | |
else: | |
angle_goal = constrain((10000 * angley.value) * pp + pd * real_speed, -6, 6) | |
angle = e.value + angle_goal | |
gyro = g.value | |
# arrêter les moteurs si le robot est trop penché | |
if abs(angle) > 40 and totaldt > 1000 and motors_on.value == 1: | |
print("leaning too far") | |
rc.ForwardMixed(address, 0) | |
motors_on.value = 0 | |
# déterminer la réaction des moteurs avec le deuxième régulateur PD | |
stemp = (cp * angle - cd * gyro) | |
stemp2 = constrain(stemp, -127, 127) | |
speed = motors_on.value * int(stemp2) | |
if speed >= 0: | |
rc.ForwardMixed(address, speed) | |
else: | |
rc.BackwardMixed(address, - speed) | |
if run.value == 1: | |
if anglex.value > 0: | |
rc.TurnRightMixed(address, int(anglex.value * 20)) | |
else: | |
rc.TurnLeftMixed(address, int(- anglex.value * 20)) | |
# Logging | |
print("values" + "," + str(round(angle, 3)) + "," + str(round(gyro, 3)) + "," + str(round(speed, 3)) + "," + str(round(constrain(pos.value, -5 / pp, 5 / pp), 3)) + "," + str(run.value) + "," + str(angley.value) + "," + str(anglex.value) + "," + "") | |
f.write("{" + str(totaldt) + "," + str(round(angle, 3)) + "," + str(round(gyro, 3)) + "," + str(round(speed, 3)) + "}\n") | |
# écriture dans un fichier | |
if counter > 9: # Les valeurs copiées 10 fois par seconde. | |
counter = 0 | |
f.flush() | |
sys.stdout.flush() | |
except KeyboardInterrupt: | |
rc.ForwardMixed(address, 0) | |
for i in RunningProcesses.get(): | |
os.kill(i, 9) | |
def constrain(val, min_val, max_val): | |
return min(max_val, max(min_val, val)) | |
# s'occupe de démarrer les trois parties, et déclare les variables partagées. | |
if __name__ == '__main__': | |
RunningProcesses = Queue() | |
e = Value('d', 0.0) | |
g = Value('d', 0.0) | |
run = Value('i', 0) | |
angley = Value('d', 0.0) | |
anglex = Value('d', 0.0) | |
zeroPos = Value('d', 0.0) | |
pos = Value('d', 0.0) | |
motors_on = Value('i', 1) | |
p = Process(target=ReadSensor, args=(e, g, RunningProcesses)) | |
q = Process(target=MainLoop, args=(e, g, anglex, angley, run, zeroPos, pos, RunningProcesses, motors_on)) | |
rt = Process(target=remote, args=(run, anglex, angley, zeroPos, pos, RunningProcesses, motors_on)) | |
p.start() | |
q.start() | |
rt.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://www.youtube.com/watch?v=YyU3X6ZazJw for a video of the final result.