Skip to content

Instantly share code, notes, and snippets.

@wroscoe
Created October 18, 2017 05:25
Show Gist options
  • Save wroscoe/d7005216ab36cd4fa6ae0a5b428cbc1f to your computer and use it in GitHub Desktop.
Save wroscoe/d7005216ab36cd4fa6ae0a5b428cbc1f to your computer and use it in GitHub Desktop.
Drive function used on donkey car for avc race
def drive(cfg, model_path=None, use_joystick=False):
'''
Construct a working robotic vehicle from many parts.
Each part runs as a job in the Vehicle loop, calling either
it's run or run_threaded method depending on the constructor flag `threaded`.
All parts are updated one after another at the framerate given in
cfg.DRIVE_LOOP_HZ assuming each part finishes processing in a timely manner.
Parts may have named outputs and inputs. The framework handles passing named outputs
to parts requesting the same named input.
'''
#Initialize car
V = dk.vehicle.Vehicle()
cam = dk.parts.PiCamera(resolution=cfg.CAMERA_RESOLUTION)
V.add(cam, outputs=['cam/image_array'], threaded=True)
if use_joystick or cfg.USE_JOYSTICK_AS_DEFAULT:
#modify max_throttle closer to 1.0 to have more power
#modify steering_scale lower than 1.0 to have less responsive steering
ctr = dk.parts.JoystickController(max_throttle=cfg.JOYSTICK_MAX_THROTTLE,
steering_scale=cfg.JOYSTICK_STEERING_SCALE,
auto_record_on_throttle=cfg.AUTO_RECORD_ON_THROTTLE)
else:
#This web controller will create a web server that is capable
#of managing steering, throttle, and modes, and more.
ctr = dk.parts.LocalWebController()
V.add(ctr,
inputs=['cam/image_array'],
outputs=['user/angle', 'user/throttle', 'user/mode', 'recording'],
threaded=True)
#See if we should even run the pilot module.
#This is only needed because the part run_contion only accepts boolean
def pilot_condition(mode):
if mode == 'user':
return False
else:
return True
pilot_condition_part = dk.parts.Lambda(pilot_condition)
V.add(pilot_condition_part, inputs=['user/mode'], outputs=['run_pilot'])
#Run the pilot if the mode is not user.
kl = dk.parts.KerasCategorical()
#kl = dk.parts.KerasLinear()
if model_path:
kl.load(model_path)
V.add(kl, inputs=['cam/image_array'],
outputs=['pilot/angle', 'pilot/throttle'],
run_condition='run_pilot')
#Choose what inputs should change the car.
def drive_mode(mode,
user_angle, user_throttle,
pilot_angle, pilot_throttle):
if mode == 'user':
return user_angle, user_throttle
elif mode == 'local_angle':
return pilot_angle, user_throttle
else:
return pilot_angle, pilot_throttle
drive_mode_part = dk.parts.Lambda(drive_mode)
V.add(drive_mode_part,
inputs=['user/mode', 'user/angle', 'user/throttle',
'pilot/angle', 'pilot/throttle'],
outputs=['angle', 'throttle'])
def multiply_angle(angle):
angle = angle*1.2
return angle
multiply_angle_part = dk.parts.Lambda(multiply_angle)
V.add(multiply_angle_part,
inputs=['angle'],
outputs=['angle'],
run_condition='run_pilot')
class SmoothAngle:
"""
Wraps a function into a donkey part.
"""
def __init__(self, factor=2):
"""
Accepts the function to use.
"""
self.factor = factor
self.last = 0.0
def run(self, current):
new = (current*self.factor+self.last)/(self.factor+1)
self.last = new
return new
def shutdown(self):
return
smooth_angle_part = SmoothAngle(factor=3)
V.add(smooth_angle_part,
inputs=['angle'],
outputs=['angle'],
run_condition='run_pilot')
def scale_throttle(throttle, angle):
throttle = throttle * (1.0 - (abs(angle) * .6))
return throttle
scale_throttle_part = dk.parts.Lambda(scale_throttle)
V.add(scale_throttle_part,
inputs=['throttle', 'angle'],
outputs=['throttle'],
run_condition='run_pilot')
steering_controller = dk.parts.PCA9685(cfg.STEERING_CHANNEL)
steering = dk.parts.PWMSteering(controller=steering_controller,
left_pulse=cfg.STEERING_LEFT_PWM,
right_pulse=cfg.STEERING_RIGHT_PWM)
throttle_controller = dk.parts.PCA9685(cfg.THROTTLE_CHANNEL)
throttle = dk.parts.PWMThrottle(controller=throttle_controller,
max_pulse=cfg.THROTTLE_FORWARD_PWM,
zero_pulse=cfg.THROTTLE_STOPPED_PWM,
min_pulse=cfg.THROTTLE_REVERSE_PWM)
V.add(steering, inputs=['angle'])
V.add(throttle, inputs=['throttle'])
#add tub to save data
inputs=['cam/image_array',
'user/angle', 'user/throttle',
'user/mode']
types=['image_array',
'float', 'float',
'str']
th = dk.parts.TubHandler(path=cfg.DATA_PATH)
tub = th.new_tub_writer(inputs=inputs, types=types)
V.add(tub, inputs=inputs, run_condition='recording')
#run the vehicle for 20 seconds
V.start(rate_hz=cfg.DRIVE_LOOP_HZ,
max_loop_count=cfg.MAX_LOOPS)
print("You can now go to <your pi ip address>:8887 to drive your car.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment