-
-
Save nbertram/c31b8d77dbbffa5bb7378bf7291348af to your computer and use it in GitHub Desktop.
Ardupilot ODrive control
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- config | |
local CONFIG_MAX_VELOCITY = 5 -- r/s | |
local CONFIG_MAX_TORQUE = 40 -- amps or Nm, depending on config | |
-- constants | |
local CONTROL_MODE = { | |
TORQUE_CONTROL = "1" | |
} | |
local AXIS_STATE = { | |
UNDEFINED = "0", | |
IDLE = "1", | |
STARTUP_SEQUENCE = "2", | |
CLOSED_LOOP_CONTROL = "8" | |
} | |
local INPUT_MODE = { | |
PASSTHROUGH = "1" | |
} | |
local SERVO_FUNCTION = { | |
STEERING = 26, | |
THROTTLE = 70, | |
THROTTLE_LEFT = 73, | |
THROTTLE_RIGHT = 74 | |
} | |
-- startup | |
gcs:send_text(0, "ODrive controller loaded") | |
local arming_auth_id = arming:get_aux_auth_id() | |
local port = assert(serial:find_serial(0), "Could not find scripting port for ODrive") | |
port:begin(115200) | |
port:set_flow_control(0) | |
-- forward declaration | |
local halt, odrive, motors | |
local ODrive = {} | |
function ODrive:new() | |
local instance = {} | |
setmetatable(instance, self) | |
self.__index = self | |
return instance | |
end | |
function ODrive:send(req) | |
for idx = 1, #req do | |
port:write(req:byte(idx)) | |
end | |
port:write(string.byte("\n")) | |
end | |
-- coroutine | |
function ODrive:request (req) | |
while port:available() > 0 do | |
-- clear out whatever's on the line before our request | |
port:read() | |
end | |
self:send(req) | |
-- allow 10ms to receive the timeout after now | |
local timeout = millis() + 10 | |
-- always do first yield with nothing here to make passing params the first resume() | |
coroutine.yield() | |
local buffer = "" | |
while millis() <= timeout do | |
local n_bytes = port:available() | |
for _ = 1, n_bytes do | |
local byte = port:read() | |
if byte == 10 then -- \n | |
return buffer | |
end | |
buffer = buffer .. string.char(byte) | |
if string.len(buffer) > 100 then | |
gcs:send_text(0, "ODrive response overflow") | |
return buffer | |
end | |
end | |
-- pass control back until we're called again | |
coroutine.yield() | |
end | |
-- if we reach here, we timed out or too much data came in | |
gcs:send_text(0, "ODrive request timeout reached") | |
return nil | |
end | |
-- coroutine | |
function ODrive:get_bus_voltage() | |
local co = coroutine.create(self.request) | |
coroutine.resume(co, self, "r vbus_voltage") | |
local _, response | |
while coroutine.status(co) ~= "dead" do | |
_, response = coroutine.resume(co) | |
coroutine.yield() | |
end | |
if response == nil then | |
return nil | |
end | |
return response | |
end | |
function ODrive:get_global_error() | |
local response = self:request("r something") | |
if response == nil then | |
return nil | |
end | |
return nil | |
end | |
local Motor = {} | |
function Motor:new (index, servo_function, inverted) | |
local motor = { | |
index=index, | |
servo_function=servo_function, | |
inverted=inverted, | |
initialized=false, | |
armed=false | |
} | |
setmetatable(motor, self) | |
self.__index = self | |
return motor | |
end | |
function Motor:initialize() | |
-- some fixed configuration options | |
odrive:send("w axis" .. self.index .. ".config.watchdog_timeout 1") | |
odrive:send("w axis" .. self.index .. ".controller.config.control_mode " .. CONTROL_MODE["TORQUE_CONTROL"]) | |
odrive:send("w axis" .. self.index .. ".controller.config.vel_limit " .. CONFIG_MAX_VELOCITY) | |
odrive:send("w axis" .. self.index .. ".controller.config.input_mode " .. INPUT_MODE["PASSTHROUGH"]) | |
odrive:send("w axis" .. self.index .. ".requested_state " .. AXIS_STATE["IDLE"]) | |
self:disarm() | |
self.initialized = true | |
gcs:send_text(0, "ODrive axis" .. self.index .. "initialized") | |
end | |
function Motor:disarm() | |
odrive:send("w axis" .. self.index .. ".requested_state " .. AXIS_STATE["IDLE"]) | |
odrive:send("w axis" .. self.index .. ".config.enable_watchdog 0") | |
self.armed = false | |
end | |
function Motor:arm() | |
odrive:send("w axis" .. self.index .. ".requested_state " .. AXIS_STATE["CLOSED_LOOP_CONTROL"]) | |
odrive:send("w axis" .. self.index .. ".config.enable_watchdog 1") | |
odrive:send("c " .. self.index .. "0") | |
self.armed = true | |
gcs:send_text(0, "ODrive axis" .. self.index .. "armed") | |
end | |
function Motor:get_error() | |
local response = odrive:request("r something") | |
if response == nil then | |
return nil | |
end | |
return nil | |
end | |
function Motor:proxy_throttle() | |
local pwm = SRV_Channels:get_output_pwm(self.servo_function) | |
if pwm < 1000 or pwm > 2000 then | |
halt("Servo PWM out of range: " .. pwm) | |
return | |
end | |
-- 1000=full backwards, 1500=stopped, 2000=full forwards | |
local throttle = (((pwm - 1000) * (CONFIG_MAX_TORQUE * 2)) / (2000 - 1000)) - CONFIG_MAX_TORQUE | |
if self.inverted then | |
throttle = throttle * -1 | |
end | |
odrive:send("c " .. self.index .. throttle) | |
end | |
function Motor:proxy_telemetry() | |
local res = odrive:request("f " .. self.index) | |
if res ~= nil then | |
local _, _, rps = string.find(res, ".+ (.+)$") | |
if rps ~= nil then | |
esc_telem:update_rpm(self.index, rps * 60, 0) | |
end | |
end | |
res = odrive:request("r something") | |
if res ~= nil then | |
esc_telem:update_current(self.index, res) | |
end | |
end | |
function halt(problem) | |
arming:disarm() | |
gcs:send_text(0, "ODrive halt: " .. problem) | |
arming:set_aux_auth_failed(arming_auth_id, "ODrive: " .. problem) | |
for _, motor in ipairs(motors) do | |
motor:disarm() | |
end | |
end | |
odrive = ODrive:new() | |
motors = { | |
-- two axes | |
Motor:new(0, SERVO_FUNCTION["THROTTLE_LEFT"], true), | |
Motor:new(1, SERVO_FUNCTION["THROTTLE_RIGHT"], false) | |
} | |
local last_update_sent_at = 0 | |
-- coroutine | |
local function loopHandler() | |
local armed = arming:is_armed() | |
-- start by unconditionally sending throttle/arming status | |
-- if the motor has tripped with an error, we'll discover that later and it'll ignore this | |
-- but we need to send the update first to keep the timing loop consistent. | |
for _, motor in ipairs(motors) do | |
if armed then | |
if not motor.armed then | |
motor:arm() | |
end | |
-- the actual setting of the speed | |
motor:proxy_throttle() | |
else | |
-- repeatedly send disarm just in case | |
motor:disarm() | |
end | |
end | |
last_update_sent_at = millis() | |
-- check ODrive is there by reading something | |
local v_co = coroutine.create(odrive.get_bus_voltage) | |
coroutine.resume(v_co, odrive) | |
local _, volts | |
while coroutine.status ~= "dead" do | |
_, volts = coroutine.resume(v_co) | |
coroutine.yield(1) | |
end | |
if volts == nil then | |
halt("not responding") | |
-- slower re-scheduling | |
return 5000 | |
end | |
arming:set_aux_auth_passed(arming_auth_id) | |
--[[ | |
local global_error = odrive:get_global_error() | |
if global_error ~= nil then | |
halt("global error: " .. global_error) | |
-- slower re-scheduling | |
return loop, 10000 | |
end | |
]] | |
-- reschedule so the next update starts 20ms after the first (50Hz) | |
local next_update_due = last_update_sent_at + 20 | |
if millis() < next_update_due then | |
return next_update_due - millis() | |
else | |
-- already overdue, run again ASAP | |
gcs:send_text(0, "ODrive loop overdue") | |
return 0 | |
end | |
end | |
local loopCo | |
local function loop () | |
-- run the loop as a coroutine so we can resume at the point we left off | |
-- after waiting for a response from the ODrive | |
if loopCo == nil or coroutine.status(loopCo) == "dead" then | |
loopCo = coroutine.create(loopHandler) | |
end | |
local _, rescheduleTime = coroutine.resume(loopCo) | |
assert(rescheduleTime, "ODrive rescheduleTime missing") | |
return loop, rescheduleTime | |
end | |
return loop, 100 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment