-
-
Save pylover/d68be364adac5f946887b85e6ed6e7ae to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3 | |
"""Fixing bluetooth stereo headphone/headset problem in debian distros. | |
Workaround for bug: https://bugs.launchpad.net/ubuntu/+source/indicator-sound/+bug/1577197 | |
Run it with python3.5 or higher after pairing/connecting the bluetooth stereo headphone. | |
This will be only fixes the bluez5 problem mentioned above . | |
Licence: Freeware | |
Install with: | |
curl "https://gist.githubusercontent.com/pylover/d68be364adac5f946887b85e6ed6e7ae/raw/install.sh" | sh | |
Shorthands: | |
$ alias speakers="a2dp.py 10:08:C1:44:AE:BC" | |
$ alias headphones="a2dp.py 00:22:37:3D:DA:50" | |
$ alias headset="a2dp.py 00:22:37:F8:A0:77 -p hsp" | |
$ speakers | |
See ``a2dp.py -h`` for help. | |
Check here for updates: https://gist.github.com/pylover/d68be364adac5f946887b85e6ed6e7ae | |
Thanks to: | |
* https://github.com/DominicWatson, for adding the ``-p/--profile`` argument. | |
* https://github.com/IzzySoft, for mentioning wait before connecting again. | |
* https://github.com/AmploDev, for v0.4.0 | |
* https://github.com/Mihara, for autodetect & autorun service | |
* https://github.com/dabrovnijk, for systemd service | |
Change Log | |
---------- | |
- 0.6.2 | |
* Fix program name inside the help message. | |
- 0.6.1 | |
* Fix Py warning | |
- 0.6.0 | |
* Install script | |
- 0.5.2 | |
* Increasing the number of tries to 15. | |
- 0.5.2 | |
* Optimizing waits. | |
- 0.5.1 | |
* Increasing WAIT_TIME and TRIES | |
- 0.5.0 | |
* Autodetect & autorun service | |
- 0.4.1 | |
* Sorting device list | |
- 0.4.0 | |
* Adding ignore_fail argument by @AmploDev. | |
* Sending all available streams into selected sink, after successfull connection by @AmploDev. | |
- 0.3.3 | |
* Updating default sink before turning to ``off`` profile. | |
- 0.3.2 | |
* Waiting a bit: ``-w/--wait`` before connecting again. | |
- 0.3.0 | |
* Adding -p / --profile option for using the same script to switch between headset and A2DP audio profiles | |
- 0.2.5 | |
* Mentioning [mac] argument. | |
- 0.2.4 | |
* Removing duplicated devices in select device list. | |
- 0.2.3 | |
* Matching ANSI escape characters. Tested on 16.10 & 16.04 | |
- 0.2.2 | |
* Some sort of code enhancements. | |
- 0.2.0 | |
* Adding `-V/--version`, `-w/--wait` and `-t/--tries` CLI arguments. | |
- 0.1.1 | |
* Supporting the `[NEW]` prefix for devices & controllers as advised by @wdullaer | |
* Drying the code. | |
""" | |
import sys | |
import re | |
import asyncio | |
import subprocess as sb | |
import argparse | |
__version__ = '0.6.2' | |
HEX_DIGIT_PATTERN = '[0-9A-F]' | |
HEX_BYTE_PATTERN = '%s{2}' % HEX_DIGIT_PATTERN | |
MAC_ADDRESS_PATTERN = ':'.join((HEX_BYTE_PATTERN, ) * 6) | |
DEVICE_PATTERN = re.compile('^(?:.*\s)?Device\s(?P<mac>%s)\s(?P<name>.*)' % MAC_ADDRESS_PATTERN) | |
CONTROLLER_PATTERN = re.compile('^(?:.*\s)?Controller\s(?P<mac>%s)\s(?P<name>.*)' % MAC_ADDRESS_PATTERN) | |
WAIT_TIME = 2.25 | |
TRIES = 15 | |
PROFILE = 'a2dp' | |
_profiles = { | |
'a2dp': 'a2dp_sink', | |
'hsp': 'headset_head_unit', | |
'off': 'off' | |
} | |
# CLI Arguments | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-e', '--echo', action='store_true', default=False, | |
help='If given, the subprocess stdout will be also printed on stdout.') | |
parser.add_argument('-w', '--wait', default=WAIT_TIME, type=float, | |
help='The seconds to wait for subprocess output, default is: %s' % WAIT_TIME) | |
parser.add_argument('-t', '--tries', default=TRIES, type=int, | |
help='The number of tries if subprocess is failed. default is: %s' % TRIES) | |
parser.add_argument('-p', '--profile', default=PROFILE, | |
help='The profile to switch to. available options are: hsp, a2dp. default is: %s' % PROFILE) | |
parser.add_argument('-V', '--version', action='store_true', help='Show the version.') | |
parser.add_argument('mac', nargs='?', default=None) | |
# Exceptions | |
class SubprocessError(Exception): | |
pass | |
class RetryExceededError(Exception): | |
pass | |
class BluetoothctlProtocol(asyncio.SubprocessProtocol): | |
def __init__(self, exit_future, echo=True): | |
self.exit_future = exit_future | |
self.transport = None | |
self.output = None | |
self.echo = echo | |
def listen_output(self): | |
self.output = '' | |
def not_listen_output(self): | |
self.output = None | |
def pipe_data_received(self, fd, raw): | |
d = raw.decode() | |
if self.echo: | |
print(d, end='') | |
if self.output is not None: | |
self.output += d | |
def process_exited(self): | |
self.exit_future.set_result(True) | |
def connection_made(self, transport): | |
self.transport = transport | |
print('Connection MADE') | |
async def send_command(self, c): | |
stdin_transport = self.transport.get_pipe_transport(0) | |
# noinspection PyProtectedMember | |
stdin_transport._pipe.write(('%s\n' % c).encode()) | |
async def search_in_output(self, expression, fail_expression=None): | |
if self.output is None: | |
return None | |
for l in self.output.splitlines(): | |
if fail_expression and re.search(fail_expression, l, re.IGNORECASE): | |
raise SubprocessError('Expression "%s" failed with fail pattern: "%s"' % (l, fail_expression)) | |
if re.search(expression, l, re.IGNORECASE): | |
return True | |
async def send_and_wait(self, cmd, wait_expression, fail_expression='fail'): | |
try: | |
self.listen_output() | |
await self.send_command(cmd) | |
while not await self.search_in_output(wait_expression.lower(), fail_expression=fail_expression): | |
await wait() | |
finally: | |
self.not_listen_output() | |
async def disconnect(self, mac): | |
print('Disconnecting the device.') | |
await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected') | |
async def connect(self, mac): | |
print('Connecting again.') | |
await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful') | |
async def trust(self, mac): | |
await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded') | |
async def quit(self): | |
await self.send_command('quit') | |
async def get_list(self, command, pattern): | |
result = set() | |
try: | |
self.listen_output() | |
await self.send_command(command) | |
await wait() | |
for l in self.output.splitlines(): | |
m = pattern.match(l) | |
if m: | |
result.add(m.groups()) | |
return sorted(list(result), key=lambda i: i[1]) | |
finally: | |
self.not_listen_output() | |
async def list_devices(self): | |
return await self.get_list('devices', DEVICE_PATTERN) | |
async def list_paired_devices(self): | |
return await self.get_list('paired-devices', DEVICE_PATTERN) | |
async def list_controllers(self): | |
return await self.get_list('list', CONTROLLER_PATTERN) | |
async def select_paired_device(self): | |
print('Selecting device:') | |
devices = await self.list_paired_devices() | |
count = len(devices) | |
if count < 1: | |
raise SubprocessError('There is no connected device.') | |
elif count == 1: | |
return devices[0] | |
for i, d in enumerate(devices): | |
print('%d. %s %s' % (i+1, d[0], d[1])) | |
print('Select device[1]:') | |
selected = input() | |
return devices[0 if not selected.strip() else (int(selected) - 1)] | |
async def wait(delay=None): | |
return await asyncio.sleep(WAIT_TIME or delay) | |
async def execute_command(cmd, ignore_fail=False): | |
p = await asyncio.create_subprocess_shell(cmd, stdout=sb.PIPE, stderr=sb.PIPE) | |
stdout, stderr = await p.communicate() | |
stdout, stderr = \ | |
stdout.decode() if stdout is not None else '', \ | |
stderr.decode() if stderr is not None else '' | |
if p.returncode != 0 or stderr.strip() != '': | |
message = 'Command: %s failed with status: %s\nstderr: %s' % (cmd, p.returncode, stderr) | |
if ignore_fail: | |
print('Ignoring: %s' % message) | |
else: | |
raise SubprocessError(message) | |
return stdout | |
async def execute_find(cmd, pattern, tries=0, fail_safe=False): | |
tries = tries or TRIES | |
message = 'Cannot find `%s` using `%s`.' % (pattern, cmd) | |
retry_message = message + ' Retrying %d more times' | |
while True: | |
stdout = await execute_command(cmd) | |
match = re.search(pattern, stdout) | |
if match: | |
return match.group() | |
elif tries > 0: | |
await wait() | |
print(retry_message % tries) | |
tries -= 1 | |
continue | |
if fail_safe: | |
return None | |
raise RetryExceededError('Retry times exceeded: %s' % message) | |
async def find_dev_id(mac, **kw): | |
return await execute_find('pactl list cards short', 'bluez_card.%s' % '_'.join(mac), **kw) | |
async def find_sink(mac, **kw): | |
return await execute_find('pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac), **kw) | |
async def set_profile(device_id, profile): | |
print('Setting the %s profile' % profile) | |
try: | |
return await execute_command('pactl set-card-profile %s %s' % (device_id, _profiles[profile])) | |
except KeyError: | |
print('Invalid profile: %s, please select one one of a2dp or hsp.' % profile, file=sys.stderr) | |
raise SystemExit(1) | |
async def set_default_sink(sink): | |
print('Updating default sink to %s' % sink) | |
return await execute_command('pacmd set-default-sink %s' % sink) | |
async def move_streams_to_sink(sink): | |
streams = await execute_command('pacmd list-sink-inputs | grep "index:"', True) | |
for i in streams.split(): | |
i = ''.join(n for n in i if n.isdigit()) | |
if i != '': | |
print('Moving stream %s to sink' % i) | |
await execute_command('pacmd move-sink-input %s %s' % (i, sink)) | |
return sink | |
async def main(args): | |
global WAIT_TIME, TRIES | |
if args.version: | |
print(__version__) | |
return 0 | |
mac = args.mac | |
# Hacking, Changing the constants! | |
WAIT_TIME = args.wait | |
TRIES = args.tries | |
exit_future = asyncio.Future() | |
transport, protocol = await asyncio.get_event_loop().subprocess_exec( | |
lambda: BluetoothctlProtocol(exit_future, echo=args.echo), 'bluetoothctl' | |
) | |
try: | |
if mac is None: | |
mac, _ = await protocol.select_paired_device() | |
mac = mac.split(':' if ':' in mac else '_') | |
print('Device MAC: %s' % ':'.join(mac)) | |
device_id = await find_dev_id(mac, fail_safe=True) | |
if device_id is None: | |
print('It seems device: %s is not connected yet, trying to connect.' % ':'.join(mac)) | |
await protocol.trust(mac) | |
await protocol.connect(mac) | |
device_id = await find_dev_id(mac) | |
sink = await find_sink(mac, fail_safe=True) | |
if sink is None: | |
await set_profile(device_id, args.profile) | |
sink = await find_sink(mac) | |
print('Device ID: %s' % device_id) | |
print('Sink: %s' % sink) | |
await set_default_sink(sink) | |
await wait() | |
await set_profile(device_id, 'off') | |
if args.profile == 'a2dp': | |
await protocol.disconnect(mac) | |
await wait() | |
await protocol.connect(mac) | |
device_id = await find_dev_id(mac) | |
print('Device ID: %s' % device_id) | |
await wait(2) | |
await set_profile(device_id, args.profile) | |
await set_default_sink(sink) | |
await move_streams_to_sink(sink) | |
except (SubprocessError, RetryExceededError) as ex: | |
print(str(ex), file=sys.stderr) | |
return 1 | |
finally: | |
print('Exiting bluetoothctl') | |
await protocol.quit() | |
await exit_future | |
# Close the stdout pipe | |
transport.close() | |
if args.profile == 'a2dp': | |
print('"Enjoy" the HiFi stereo music :)') | |
else: | |
print('"Enjoy" your headset audio :)') | |
if __name__ == '__main__': | |
sys.exit(asyncio.get_event_loop().run_until_complete(main(parser.parse_args()))) |
#!/usr/bin/python | |
import dbus | |
from dbus.mainloop.glib import DBusGMainLoop | |
import gobject | |
import subprocess | |
import time | |
# Where you keep the above script. Must be executable, obviously. | |
A2DP = '/home/mihara/.local/bin/a2dp' | |
# A list of device IDs that you wish to run it on. | |
DEV_IDS = ['00:18:09:30:FC:D8','FC:58:FA:B1:2D:25'] | |
device_paths = [] | |
devices = [] | |
dbus_loop = DBusGMainLoop() | |
bus = dbus.SystemBus(mainloop=dbus_loop) | |
def generate_handler(device_id): | |
last_ran = [0] # WHOA, this is crazy closure behavior: A simple variable does NOT work. | |
def cb(*args, **kwargs): | |
if args[0] == 'org.bluez.MediaControl1': | |
if args[1].get('Connected'): | |
print("Connected {}".format(device_id)) | |
if last_ran[0] < time.time() - 120: | |
print("Fixing...") | |
subprocess.call([A2DP,device_id]) | |
last_ran[0] = time.time() | |
else: | |
print("Disconnected {}".format(device_id)) | |
return cb | |
# Figure out the path to the headset | |
man = bus.get_object('org.bluez', '/') | |
iface = dbus.Interface(man, 'org.freedesktop.DBus.ObjectManager') | |
for device in iface.GetManagedObjects().keys(): | |
for id in DEV_IDS: | |
if device.endswith(id.replace(':','_')): | |
device_paths.append((id, device)) | |
for id, device in device_paths: | |
headset = bus.get_object('org.bluez', device) | |
headset.connect_to_signal("PropertiesChanged", generate_handler(id), dbus_interface='org.freedesktop.DBus.Properties') | |
devices.append(headset) | |
loop = gobject.MainLoop() | |
loop.run() | |
[Unit] | |
Description=Fix BT Speaker | |
[Service] | |
Type=simple | |
Restart=always | |
ExecStart=/usr/bin/fix-bt.py | |
[Install] | |
WantedBy=multi-user.target | |
#! /usr/bin/env bash | |
URL="https://gist.githubusercontent.com/pylover/d68be364adac5f946887b85e6ed6e7ae/raw/a2dp.py" | |
TARGET=/usr/local/bin/a2dp.py | |
curl $URL | sudo tee ${TARGET} > /dev/null | |
sudo chmod +x $TARGET | |
echo "a2dp installation was successfull." | |
echo "Run a2dp.py [-h/--help] and enjoy the HiFi music!" | |
so everytime I restarted pactl list was unavailable because pipewire pulseaudio was masked,
sudo apt update
sudo apt install --reinstall pipewire pipewire-pulse pipewire-audio-client-libraries
after this noo other tricks ever worked and audio never worked again and I gave up, I could uninstall and reinstall pulseaudio but I cant bother.
I kinda solved it, these were the steps...
upgraded to latest version pipewire ppa
remove blueman
edit /etc/bluetooth/audio.conf
~$ cat /etc/bluetooth/audio.conf
[General]
Enable=Control,Gateway,Headset,Media,Sink,Socket,Source
mv .config/systemd to .config/systemd.backup
uninstall https://old.reddit.com/r/pop_os/comments/ol9nxx/installing_latest_pipewire_with_working_bluetooth/
install wireplumber
sudo rm /etc/systemd/user/pipewire-media-session.service
systemctl --user --now enable wireplumber.service
systemctl --user start wireplumber.service
sudo rm -rf /var/lib/bluetooth/*
DEVICE_MAC=$(bluetoothctl devices | awk '/Device/{print $2}')
# Step 5: Pair, trust, and connect to the Bluetooth device
echo "Pairing, trusting, and connecting to the Bluetooth device..."
echo -e "pair $DEVICE_MAC\ntrust $DEVICE_MAC\nconnect $DEVICE_MAC\nquit" | bluetoothctl
New problems: Now it is either a2p or hfp , cant switch, but now I get 3 different profiles..
Profile:
off: Off (Ziele: 0, Quellen: 0, Priorität: 0, verfügbar: ja)
a2dp-sink-sbc: High Fidelity Playback (A2DP Sink, codec SBC) (Ziele: 1, Quellen: 0, Priorität: 18, verfügbar: ja)
a2dp-sink-sbc_xq: High Fidelity Playback (A2DP Sink, codec SBC-XQ) (Ziele: 1, Quellen: 0, Priorität: 17, verfügbar: ja)
a2dp-sink: High Fidelity Playback (A2DP Sink, codec AAC) (Ziele: 1, Quellen: 0, Priorität: 19, verfügbar: ja)
Aktives Profil: a2dp-sink
```
remove bluetooth reconenct and pray you get the hfp profile
Settings
└─ Default Configured Node Names:
0. Audio/Sink bluez_output.E8_AB_FA_40_25_3B.headset-head-unit
kreijstal@kreijstalnuc:~$ pactl list sinks short
165 bluez_output.E8_AB_FA_40_25_3B.headset-head-unit PipeWire s16le 1ch 16000Hz RUNNING
It's either stuck on headset-head-unit or a2dp-sink, only with wireplumber, things worked before, but oh well. omg
so I rebooted and sound was gone again, and I perfomed my daily ritual of trying random commands to see what sticks to the wall, I dont like casting
sudo dpkg-reconfigure bluez
sudo dpkg-reconfigure linux-sound-base
but it seems in this case that did the trick along with other things.. let's see if it survives this reboot,
everything was fixed with Mint 22. Finally.
i just need to start pipewire service locally, not glboally, so no pulseaudio, right,
systemctl --user status pipewire
should fix pactl lists not being available, right?