Skip to content

Instantly share code, notes, and snippets.

@nbertram
Last active May 4, 2023 10:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nbertram/c31b8d77dbbffa5bb7378bf7291348af to your computer and use it in GitHub Desktop.
Save nbertram/c31b8d77dbbffa5bb7378bf7291348af to your computer and use it in GitHub Desktop.
Ardupilot ODrive control
-- config
local CONFIG_MAX_VELOCITY = 5 -- r/s
local CONFIG_MAX_TORQUE = 40 -- amps or Nm, depending on config
-- constants
local CONTROL_MODE = {
TORQUE_CONTROL = "1"
}
local AXIS_STATE = {
UNDEFINED = "0",
IDLE = "1",
STARTUP_SEQUENCE = "2",
CLOSED_LOOP_CONTROL = "8"
}
local INPUT_MODE = {
PASSTHROUGH = "1"
}
local SERVO_FUNCTION = {
STEERING = 26,
THROTTLE = 70,
THROTTLE_LEFT = 73,
THROTTLE_RIGHT = 74
}
-- startup
gcs:send_text(0, "ODrive controller loaded")
local arming_auth_id = arming:get_aux_auth_id()
local port = assert(serial:find_serial(0), "Could not find scripting port for ODrive")
port:begin(115200)
port:set_flow_control(0)
-- forward declaration
local halt, odrive, motors
local ODrive = {}
function ODrive:new()
local instance = {}
setmetatable(instance, self)
self.__index = self
return instance
end
function ODrive:send(req)
for idx = 1, #req do
port:write(req:byte(idx))
end
port:write(string.byte("\n"))
end
-- coroutine
function ODrive:request (req)
while port:available() > 0 do
-- clear out whatever's on the line before our request
port:read()
end
self:send(req)
-- allow 10ms to receive the timeout after now
local timeout = millis() + 10
-- always do first yield with nothing here to make passing params the first resume()
coroutine.yield()
local buffer = ""
while millis() <= timeout do
local n_bytes = port:available()
for _ = 1, n_bytes do
local byte = port:read()
if byte == 10 then -- \n
return buffer
end
buffer = buffer .. string.char(byte)
if string.len(buffer) > 100 then
gcs:send_text(0, "ODrive response overflow")
return buffer
end
end
-- pass control back until we're called again
coroutine.yield()
end
-- if we reach here, we timed out or too much data came in
gcs:send_text(0, "ODrive request timeout reached")
return nil
end
-- coroutine
function ODrive:get_bus_voltage()
local co = coroutine.create(self.request)
coroutine.resume(co, self, "r vbus_voltage")
local _, response
while coroutine.status(co) ~= "dead" do
_, response = coroutine.resume(co)
coroutine.yield()
end
if response == nil then
return nil
end
return response
end
function ODrive:get_global_error()
local response = self:request("r something")
if response == nil then
return nil
end
return nil
end
local Motor = {}
function Motor:new (index, servo_function, inverted)
local motor = {
index=index,
servo_function=servo_function,
inverted=inverted,
initialized=false,
armed=false
}
setmetatable(motor, self)
self.__index = self
return motor
end
function Motor:initialize()
-- some fixed configuration options
odrive:send("w axis" .. self.index .. ".config.watchdog_timeout 1")
odrive:send("w axis" .. self.index .. ".controller.config.control_mode " .. CONTROL_MODE["TORQUE_CONTROL"])
odrive:send("w axis" .. self.index .. ".controller.config.vel_limit " .. CONFIG_MAX_VELOCITY)
odrive:send("w axis" .. self.index .. ".controller.config.input_mode " .. INPUT_MODE["PASSTHROUGH"])
odrive:send("w axis" .. self.index .. ".requested_state " .. AXIS_STATE["IDLE"])
self:disarm()
self.initialized = true
gcs:send_text(0, "ODrive axis" .. self.index .. "initialized")
end
function Motor:disarm()
odrive:send("w axis" .. self.index .. ".requested_state " .. AXIS_STATE["IDLE"])
odrive:send("w axis" .. self.index .. ".config.enable_watchdog 0")
self.armed = false
end
function Motor:arm()
odrive:send("w axis" .. self.index .. ".requested_state " .. AXIS_STATE["CLOSED_LOOP_CONTROL"])
odrive:send("w axis" .. self.index .. ".config.enable_watchdog 1")
odrive:send("c " .. self.index .. "0")
self.armed = true
gcs:send_text(0, "ODrive axis" .. self.index .. "armed")
end
function Motor:get_error()
local response = odrive:request("r something")
if response == nil then
return nil
end
return nil
end
function Motor:proxy_throttle()
local pwm = SRV_Channels:get_output_pwm(self.servo_function)
if pwm < 1000 or pwm > 2000 then
halt("Servo PWM out of range: " .. pwm)
return
end
-- 1000=full backwards, 1500=stopped, 2000=full forwards
local throttle = (((pwm - 1000) * (CONFIG_MAX_TORQUE * 2)) / (2000 - 1000)) - CONFIG_MAX_TORQUE
if self.inverted then
throttle = throttle * -1
end
odrive:send("c " .. self.index .. throttle)
end
function Motor:proxy_telemetry()
local res = odrive:request("f " .. self.index)
if res ~= nil then
local _, _, rps = string.find(res, ".+ (.+)$")
if rps ~= nil then
esc_telem:update_rpm(self.index, rps * 60, 0)
end
end
res = odrive:request("r something")
if res ~= nil then
esc_telem:update_current(self.index, res)
end
end
function halt(problem)
arming:disarm()
gcs:send_text(0, "ODrive halt: " .. problem)
arming:set_aux_auth_failed(arming_auth_id, "ODrive: " .. problem)
for _, motor in ipairs(motors) do
motor:disarm()
end
end
odrive = ODrive:new()
motors = {
-- two axes
Motor:new(0, SERVO_FUNCTION["THROTTLE_LEFT"], true),
Motor:new(1, SERVO_FUNCTION["THROTTLE_RIGHT"], false)
}
local last_update_sent_at = 0
-- coroutine
local function loopHandler()
local armed = arming:is_armed()
-- start by unconditionally sending throttle/arming status
-- if the motor has tripped with an error, we'll discover that later and it'll ignore this
-- but we need to send the update first to keep the timing loop consistent.
for _, motor in ipairs(motors) do
if armed then
if not motor.armed then
motor:arm()
end
-- the actual setting of the speed
motor:proxy_throttle()
else
-- repeatedly send disarm just in case
motor:disarm()
end
end
last_update_sent_at = millis()
-- check ODrive is there by reading something
local v_co = coroutine.create(odrive.get_bus_voltage)
coroutine.resume(v_co, odrive)
local _, volts
while coroutine.status ~= "dead" do
_, volts = coroutine.resume(v_co)
coroutine.yield(1)
end
if volts == nil then
halt("not responding")
-- slower re-scheduling
return 5000
end
arming:set_aux_auth_passed(arming_auth_id)
--[[
local global_error = odrive:get_global_error()
if global_error ~= nil then
halt("global error: " .. global_error)
-- slower re-scheduling
return loop, 10000
end
]]
-- reschedule so the next update starts 20ms after the first (50Hz)
local next_update_due = last_update_sent_at + 20
if millis() < next_update_due then
return next_update_due - millis()
else
-- already overdue, run again ASAP
gcs:send_text(0, "ODrive loop overdue")
return 0
end
end
local loopCo
local function loop ()
-- run the loop as a coroutine so we can resume at the point we left off
-- after waiting for a response from the ODrive
if loopCo == nil or coroutine.status(loopCo) == "dead" then
loopCo = coroutine.create(loopHandler)
end
local _, rescheduleTime = coroutine.resume(loopCo)
assert(rescheduleTime, "ODrive rescheduleTime missing")
return loop, rescheduleTime
end
return loop, 100
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment