Last active
August 10, 2018 21:49
-
-
Save Mateo-S/cc93b68ce13ba7087a734df306287e22 to your computer and use it in GitHub Desktop.
Autonomous driver prototype for Chrono
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import sys | |
import DriverMessages_pb2 | |
import math | |
import gpxpy | |
import gpxpy.gpx | |
class Driver: | |
def __init__(self): | |
print("Initializing Driver...\n") | |
# initialize driver inputs (e.g. lidar, grps, imu, time, light) | |
self.lidar = DriverMessages_pb2.lidar() | |
self.gps = DriverMessages_pb2.gps() | |
self.imu = DriverMessages_pb2.imu() | |
self.time = DriverMessages_pb2.time() | |
self.light = DriverMessages_pb2.light() | |
# initialize local vars | |
self.inputLidar, self.inputGPS, self.inputIMU, self.inputTime, self.inputLight = 0, 0, 0, 0, 0 | |
self.lastOutputValue = None | |
self.outputValue = None | |
self.totalDistance = None | |
self.ACCELERATE = 0 | |
self.BRAKE = 1 | |
self.STEER = 2 | |
print("Variables initialized succesfully\n") | |
#Declaring functions for reciving and parsing Driver Messages (e.g. SYN_Sedan) | |
def ParseLidarMessage(self, msg): | |
# lidar = DriverMessages_pb2.lidar() | |
self.lidar.ParseFromString(msg) | |
print("Lidar Input:\n"+self.lidar.__str__()+"\n") | |
def ParseGPSMessage(self, msg): | |
# gps = DriverMessages_pb2.gps() | |
inputGPS = self.gps.ParseFromString(msg) | |
print("GPS Input:\n"+self.gps.__str__()+"\n") | |
def ParseIMUMessage(self, msg): | |
# imu = DriverMessages_pb2.imu() | |
inputIMU = self.imu.ParseFromString(msg) | |
print("IMU Input:\n"+self.imu.__str__()+"\n") | |
def ParseTimeMessage(self, msg): | |
# time = DriverMessages_pb2.time() | |
inputTime = self.time.ParseFromString(msg) | |
print("Time Input:\n"+self.time.__str__()+"\n") | |
def ParseLightMessage(self, msg): | |
# light = DriverMessages_pb2.light() | |
inputLight = self.light.ParseFromString(msg) | |
print("Light Input:\n"+self.light.__str__()+"\n") | |
# Current testing navigation from '870 W Dayton St' (43.07101591798705, -89.39995136927791) to '455 N Park St' | |
# GPS points to go to: | |
# 43.070948, -89.399946 (Start [879 W Dayton]) | |
# 43.070962, -89.400736 (W Dayton and N Park) | |
# 43.072067, -89.400663 (W Johnson and N Park) | |
# 43.073260, -89.400533 (University Ave and N Park) | |
# 43.074212, -89.400562 (Destination [455 N Park]) | |
# goToCoord is a function that outputs steering, accelration, and braking needed to reach a target point | |
# input current and target coordinates as well as imu data... returns 3-tuple movementOutputValue | |
def goToCoord(self, currentLat, currentLong, targetLat, targetLong, imuX, imuY, imuZ, currentTarget): | |
lastAccel, lastBrake, lastSteer = self.lastOutputValue | |
currentAngle = imuX #get the current angle of the vehicle | |
# vars 'ef', 'ij', and 'gh' represent points of a triangle | |
ef = (currentLat, currentLong) # point representing vehicle | |
ij = (targetLat, targetLong) # target point | |
gh = (currentLat, targetLong) # point complting the triangle | |
# distance between points | |
a = math.hypot(currentLat - targetLat, targetLong - targetLong) | |
b = math.hypot(targetLat - currentLat, targetLong - currentLong) #distance between current posision and target | |
c = math.hypot(currentLat - currentLat, targetLong - currentLong) | |
if currentTarget == 0: #currentTarget is what target the vehicle is navigating to (e.g. 1/23) | |
self.totalDistance = b | |
goalAngle = math.sqrt(a**2 + b**2 - (2*a*b) * math.cos(c)) #angle in radians that the vehicle needs to travel in relative to the XY plane | |
# How much adjusting needs to be done to get back on target, currently | |
# steering: last angle ± 0.15 * (distance left to target / total distance from start to target) | |
# TODO tweak until it works | |
if (goalAngle % math.pi) > (currentAngle % math.pi): | |
lastSteer += 0.15*(b/self.totalDistance) | |
elif (goalAngle % math.pi) < (currentAngle % math.pi): | |
lastSteer -= 0.15*(b/self.totalDistance) | |
# return 3-tuple with acceleration, braking, and steering | |
return (lastAccel, lastBrake, lastSteer) | |
def chooseAction(self): | |
outputType = 0 | |
outputValue = (1,0,0) #outputValue is a tuple that represents the scripts output (accelration, braking, steering) | |
lastOutputValue = (0,0,0) #TODO set this to the last output | |
currentTarget = 0 # current target point (e.g. 1/23) | |
tolerance = 0.0000005 #approx 5cm tolerance on how close the vehicle should be to the point | |
if not self.gps.IsInitialized(): # throw exeception if driver sends bad GPS message | |
print("Potential error, GPS message is incomplete\n") | |
#import gps coords from gpx file | |
path = open("map.gpx", 'r') | |
gpx = gpxpy.parse(path) | |
allGpsTargets = [] | |
for track in gpx.tracks: | |
for segment in track.segments: | |
for point in segment.points: | |
allGpsTargets.append((point.latitude, point.longitude)) | |
print(allGpsTargets) | |
# set imu and gps coords from driver messages | |
imuXYZ = (self.imu.q1, self.imu.q2, self.imu.q3) | |
gpsCoords = (self.gps.latitude, self.gps.longitude) | |
self.lastOutputValue = outputValue | |
#go to next target until youre done with the last one | |
if currentTarget != len(allGpsTargets): | |
print("Current target: "+str(currentTarget)+"/"+str(len(allGpsTargets))) # say what the current target is (e.g. 1/23) | |
outputValue = self.goToCoord(*gpsCoords, *allGpsTargets[currentTarget], *imuXYZ, currentTarget) # set movement tuple (acceleration, braking, steering) to goToCoord's output | |
if math.isclose(gpsCoords[0], allGpsTargets[currentTarget][0], rel_tol=tolerance) and math.isclose(gpsCoords[1], allGpsTargets[currentTarget][1], rel_tol=tolerance): #if the vehicle is within the tolerance (5cm) of the target, go to the next target | |
currentTarget += 1 | |
elif currentTarget == len(allGpsTargets): # if you've reached all the targets (completed the route) | |
print("You (should) have arrived at your destination!") | |
print(outputValue) | |
print("Accleration output: "+str(outputValue[0])+"\nBraking output: "+str(outputValue[1])+"\nSteering output: "+str(outputValue[2])+"\n") | |
return (outputType, outputValue) | |
#method to output control message to Driver (output accel, braking, and steering values) | |
def generateControlMessage(self): | |
control = DriverMessages_pb2.control() | |
actionType, actionValue = self.chooseAction() | |
control.throttle = actionValue[0] | |
control.braking = actionValue[1] | |
control.steering = actionValue[2] | |
return control.SerializeToString() #output command (brake, accelerate, etc.) | |
def main(): | |
if (len(sys.argv) != 3): #catch exception if you don't pass 3 vars | |
print("Usage:") | |
print("\tpythonDriver.py HOST PORT\n") | |
sys.exit(1) | |
# init driver | |
driver = Driver() | |
# print(repr(driver.generateControlMessage())) | |
HOST = sys.argv[1] | |
PORT = int(sys.argv[2]) | |
#Create UDP socket for Driver to .py script communication, and bind it to ports 9000 and 9001 | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: | |
s.bind((HOST, PORT)) | |
print("Socket bound successfully. Bound to "+HOST+":"+str(PORT)) | |
while True: #keep accepting the Driver's sensor data over UDP | |
message, addr = s.recvfrom(1024) | |
driver.ParseLidarMessage(message[1:]) | |
message, addr = s.recvfrom(1024) | |
driver.ParseGPSMessage(message[1:]) | |
message, addr = s.recvfrom(1024) | |
driver.ParseIMUMessage(message[1:]) | |
message, addr = s.recvfrom(1024) | |
driver.ParseTimeMessage(message[1:]) | |
message, addr = s.recvfrom(1024) | |
driver.ParseLightMessage(message[1:]) | |
control = driver.generateControlMessage() | |
s.sendto(control , addr) | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: # a cleaner way to show a KeyboardInterrupt | |
print("Script Halted via KeyboardInturrupt") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment