Last active
February 2, 2023 22:28
-
-
Save ConnorWGarvey/48484f93052ebf41211835ed2110bced to your computer and use it in GitHub Desktop.
Every new version of BlueMan or PulseAudio on Ubuntu breaks my Bluetooth headphones in new ways. Lately, I have to connect, set the profile to "off", set the profile to "headset", disconnect, reconnect, set the profile to "high fidelity audio". Here's a script that does all of that. Put it on your path and run "headphones".
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# Requires these libraries | |
# | |
# $ gem install colorize, open4 | |
# sudo service bluetooth restart | |
# or | |
# sudo /etc/init.d/bluetooth restart | |
require 'colorize' | |
require 'io/console' | |
require 'open3' | |
require 'open4' | |
require 'ostruct' | |
class Local | |
def check_ex(**args) | |
code, out = ex(**args) | |
fail out unless code == 0 | |
out | |
end | |
def ex(command:nil, echo:true, input:nil, stream:false, working_directory:nil) | |
status, output = nil | |
Dir.chdir(working_directory || Dir.pwd) do | |
STDERR.puts "Running: #{command}".yellow if echo | |
status = nil | |
output = '' | |
Open3.popen2e(command) do |stdin, out, wait_thread| | |
if input | |
stdin.puts(input) | |
stdin.close | |
end | |
out.each do |line| | |
output << line | |
puts line if stream | |
end | |
status = wait_thread.value | |
end | |
end | |
return status, output | |
end | |
end | |
class Bluetooth | |
MAC_REGEX = '(?:[0-9A-F]{2}\:){5}[0-9A-F]{2}' | |
attr_reader :audio, :local | |
def initialize(audio:nil, local:nil) | |
@local = local || Local.new | |
@audio = audio || Audio.new(bluetooth:self, local:@local) | |
end | |
def connect(match_device:nil) | |
return if device(match_device:match_device).connected | |
execute "connect #{(match_device || device).mac}" | |
print "Waiting for #{(match_device || device).name} to connect ...".yellow | |
wait_for_device { device(match_device:match_device).connected } | |
device(match_device:match_device) | |
end | |
def device(match_device:nil) | |
@device ||= begin | |
devices = execute "devices" | |
ds = devices.scan(/^Device (#{MAC_REGEX}) (.*?)$/).map{|d|{mac:d[0], name:d[1]}} | |
device_info_texts = ds.map{|d|execute("info #{d[:mac]}")}.select{|i|i.match(/UUID\: Audio Sink/)} | |
device = nil | |
if device_info_texts.size == 1 | |
device = device_info_texts[0] | |
d = device.match(/info #{MAC_REGEX}\s*Device (?<mac>#{MAC_REGEX}).*?Name: (?<name>.*?)$.*?Connected: (?<connected>\w+)/m) | |
OpenStruct.new connected:d[:connected]=='yes', mac:d[:mac], name:d[:name] | |
else | |
devices = device_info_texts.map do |device| | |
d = device.match(/info #{MAC_REGEX}\s*Device (?<mac>#{MAC_REGEX}).*?Name: (?<name>.*?)$.*?Connected: (?<connected>\w+)/m) | |
OpenStruct.new connected:d[:connected]=='yes', mac:d[:mac], name:d[:name] | |
end | |
if match_device | |
result = devices.detect{|d|d.mac == match_device.mac} | |
fail "No device matching #{match_device} in #{devices}" unless result | |
result | |
else | |
connected_devices = devices.select{|d|d.connected} | |
if connected_devices.size == 1 | |
connected_devices[0] | |
else | |
fail("#{device_info_texts.join("\n\n")}\n\nNeed one device. Found #{device_info_texts.size}.") unless device_info_texts.size == 1 | |
end | |
end | |
end | |
end | |
end | |
def disconnect(match_device:nil) | |
match_device ||= device | |
o = execute "disconnect #{match_device.mac}" | |
print "Waiting for #{match_device.name} to disconnect ...".yellow | |
wait_for_device { !device(match_device:match_device).connected } | |
match_device | |
end | |
private | |
def execute(command) | |
local.check_ex command:'bluetoothctl', echo:false, input:"#{command}\nquit" | |
end | |
def wait_for | |
timeout = 20 | |
(0..(timeout - 1)).each do |seconds| | |
if yield | |
puts "" | |
return | |
end | |
print " #{timeout - seconds}".yellow | |
sleep 1 | |
end | |
fail 'Timed out waiting for something' | |
end | |
def wait_for_device | |
wait_for do | |
@device = nil | |
yield | |
end | |
end | |
end | |
class Audio | |
attr_reader :bluetooth, :local | |
def initialize(bluetooth:nil, local:nil) | |
@local = local || Local.new | |
@bluetooth = bluetooth || Bluetooth.new(audio:self, local:@local) | |
end | |
def device(do_retry: :yes) | |
@device ||= begin | |
bluetooth_device = bluetooth.device | |
d = find_device(mac:bluetooth_device.mac, name:bluetooth_device.name) | |
return d if d || do_retry == :no | |
m = nil | |
timeout = 20 | |
print "Waiting for audio device #{bluetooth_device.name} ...".yellow | |
(0..(timeout - 1)).each do |seconds| | |
d = find_device(mac:bluetooth_device.mac, name:bluetooth_device.name) | |
if d | |
puts "" | |
return d | |
end | |
print " #{timeout - seconds}".yellow | |
sleep 1 | |
end | |
fail "Timed out waiting for audio device #{bluetooth_device.name}" | |
end | |
end | |
def find_device(mac:, name:) | |
underscore_mac = mac.gsub ':', '_' | |
out = local.check_ex command:'pactl list', echo:false | |
m = out.match(/Card #(?<id>\d+)\s+Name: bluez_card\.#{underscore_mac}.*?Active Profile: (?<profile>[\w_]+)/m) | |
m ? OpenStruct.new(id:m[:id], name:name, profile:profile_name(m[:profile])) : nil | |
end | |
def set_default | |
name = bluetooth.device.mac.gsub ':', '_' | |
command = "pactl set-default-sink bluez_sink.#{name}.#{pulse_profile_name(device.profile.to_sym)}" | |
local.check_ex command:command, echo:false | |
device | |
end | |
def set_profile(name) | |
pulse_name = pulse_profile_name(name) | |
pactl_command = "pactl set-card-profile #{device.id} #{pulse_name}" | |
code, out = local.ex command:pactl_command, echo:false | |
if code != 0 | |
if out.strip.include?('No such entity') | |
@device = nil | |
local.check_ex command:pactl_command, echo:false | |
else | |
fail "PulseAudio error:\n #{out.strip}\n from command:`#{pactl_command}`" | |
end | |
end | |
@device = nil | |
device | |
end | |
private | |
def profile_name(name) | |
if name == 'headset_head_unit' | |
:headset | |
elsif name == 'a2dp_sink' | |
:headphones | |
elsif name == 'off' | |
:off | |
else | |
fail "Unknown profile: #{name}" | |
end | |
end | |
def pulse_profile_name(name) | |
if name == :off | |
'off' | |
elsif name == :headphones | |
'a2dp_sink' | |
elsif name == :headset | |
'headset_head_unit' | |
else | |
fail "Unknown profile: #{name}" | |
end | |
end | |
end | |
class Fixer | |
def fix | |
bluetooth = Bluetooth.new | |
audio = bluetooth.audio | |
bluetooth.connect | |
bluetooth_device = bluetooth.disconnect | |
sleep 1 | |
puts "Now disconnected: #{bluetooth_device.name}".yellow | |
bluetooth_device = bluetooth.connect(match_device:bluetooth_device) | |
sleep 1 | |
puts "Now connected: #{bluetooth_device.name}".yellow | |
sleep 1 | |
device = audio.device(do_retry: :no) | |
if device | |
puts "Resetting #{device.name}:#{device.profile}".yellow | |
device = audio.set_profile(:headset) | |
sleep 1 | |
puts "Now a headset: #{device.name}:#{device.profile}".yellow | |
end | |
bluetooth_device = bluetooth.disconnect | |
sleep 1 | |
puts "Now disconnected: #{bluetooth_device.name}".yellow | |
bluetooth_device = bluetooth.connect(match_device:bluetooth_device) | |
sleep 1 | |
puts "Now connected: #{bluetooth_device.name}".yellow | |
audio_device = audio.set_profile(:headphones) | |
sleep 1 | |
audio.set_default | |
sleep 1 | |
puts "Now default audio device: #{audio_device.name}".yellow | |
OpenStruct.new(id:bluetooth_device.id, name:bluetooth_device.name, profile:audio_device.profile) | |
end | |
end | |
fixer = Fixer.new | |
device = fixer.fix | |
if device.profile == :headphones | |
puts "Now high fidelity audio: #{device.name}:#{device.profile}".yellow | |
else | |
puts "FAILED ".red + "Restarting bluetooth service, which requires root privileges".yellow | |
print 'Password: ' | |
password = STDIN.noecho(&:gets).chomp | |
Open4::popen4('sudo service bluetooth restart') do |pid, stdin, stdout, stderr| | |
stdin.puts password | |
stdin.close | |
stdout.read.strip | |
end | |
device = fixer.fix | |
if device.profile == :headphones | |
puts "Now high fidelity audio: #{device.name}:#{device.profile}".yellow | |
else | |
puts "FAILED Not sure what to do now...".red | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment