Created
October 28, 2018 19:03
-
-
Save User65k/8b14149ca1b00dc9c8c1a8c5f552d8f6 to your computer and use it in GitHub Desktop.
switch between bluetooth and snapclient on a RPi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2.7 | |
# | |
# This File should be placed next to https://github.com/lukasjapan/bt-speaker | |
# It does not permanently block the alsa device and thus works with https://github.com/badaix/snapcast | |
# | |
# A GPIO of the Raspberry is used to determine if bluetooth or snapclient should run. | |
# | |
import RPi.GPIO as GPIO | |
from bt_speaker import AutoAcceptSingleAudioAgent, config | |
from bt_manager.media import BTMedia | |
from bt_manager.agent import BTAgentManager | |
from bt_manager.audio import SBCAudioSink | |
from bt_manager.device import BTDevice | |
from gi.repository import GLib | |
import dbus.mainloop.glib | |
import subprocess | |
import alsaaudio | |
from threading import Thread, Event | |
import logging | |
class ASink(SBCAudioSink): | |
def __init__(self, ): | |
SBCAudioSink.__init__(self, path='/endpoint/a2dpsink') | |
self.process = None | |
self.alsamixer = None | |
if config.getboolean('alsa', 'enabled'): | |
# Hook into alsa service for volume control | |
self.alsamixer = alsaaudio.Mixer( | |
control=config.get('alsa', 'mixer'), | |
id=int(config.get('alsa', 'id')), | |
cardindex=int(config.get('alsa', 'cardindex')) | |
) | |
def start(self): | |
self.startup() | |
def startup(self): | |
# Start process | |
try: | |
self.process = alsaaudio.PCM(device=config.get('alsa', 'device')) | |
self.process.setchannels(2) | |
self.process.setrate(44100) | |
self.process.setformat(alsaaudio.PCM_FORMAT_S16_LE) | |
self.process.setperiodsize(44100 / 8) | |
self.volume(64) | |
except Exception as e: | |
logging.exception("alsa startup error") | |
def stop(self): | |
self.process = None | |
def raw_audio(self, data): | |
if not self.process: | |
return | |
try: | |
self.process.write(data) | |
except Exception as e: | |
logging.exception("alsa write error") | |
def volume(self, new_volume): | |
if not config.getboolean('alsa', 'enabled'): | |
return | |
if not self.alsamixer: | |
return | |
# normalize volume | |
volume = float(new_volume) / 127.0 | |
print("Volume changed to %i%%" % (volume * 100.0)) | |
# alsamixer takes a percent value as integer from 0-100 | |
self.alsamixer.setvolume(int(volume * 100.0)) | |
class OnOffAgent(AutoAcceptSingleAudioAgent): | |
def __init__(self, connect_callback, disconnect_callback): | |
self.inhiding = Event() | |
AutoAcceptSingleAudioAgent.__init__(self, connect_callback, disconnect_callback) | |
def update_discoverable(self): | |
if not config.getboolean('bluez', 'discoverable'): | |
return | |
if bool(self.connected) or self.inhiding.is_set(): | |
print("Hiding adapter from all devices.") | |
self.adapter.set_property('Discoverable', False) | |
else: | |
print("Showing adapter to all devices.") | |
self.adapter.set_property('Discoverable', True) | |
class DBusThread(Thread): | |
def __init__(self): | |
Thread.__init__(self) | |
self.ready = Event() | |
self.sink = None | |
self.media = None | |
self.agent = None | |
self.manager = None | |
def kill(self): | |
self.manager.unregister_agent(self.agent._path) | |
self.media.unregister_endpoint(self.sink._path) | |
self.mainloop.quit() | |
def resume(self): | |
self.agent.inhiding.clear() | |
self.agent.update_discoverable() | |
def pause(self): | |
self.agent.inhiding.set() | |
self.agent.update_discoverable() | |
self.sink.close_transport() | |
self.sink.volume(127) # all full as snapcast seems to be softvol? | |
self.sink.stop() | |
#try: | |
# if self.agent.connected: | |
# dev = BTDevice(dev_path=self.agent.connected) | |
# dev.disconnect() # DBusException: org.freedesktop.DBus.Error.UnknownMethod: Method "Disconnect" with signature "" on interface "org.bluez.Device" doesn't exist | |
#except Exception as e: | |
# logging.exception("Stopping BTtoggle...") | |
def run(self): | |
# Initialize the DBus SystemBus | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
# Mainloop for communication | |
self.mainloop = GLib.MainLoop() | |
# setup bluetooth configuration | |
self.sink = ASink() | |
self.media = BTMedia(config.get('bluez', 'device_path')) | |
self.media.register_endpoint(self.sink._path, self.sink.get_properties()) | |
# setup bluetooth agent (that manages connections of devices) | |
self.agent = OnOffAgent(self._connect, self._disconnect) | |
self.manager = BTAgentManager() | |
self.manager.register_agent(self.agent._path, "NoInputNoOutput") | |
self.manager.request_default_agent(self.agent._path) | |
# Run | |
self.ready.set() | |
self.mainloop.run() | |
def _connect(self): | |
subprocess.Popen(config.get('bt_speaker', 'connect_command'), shell=True).communicate() | |
def _disconnect(self): | |
self.sink.close_transport() | |
subprocess.Popen(config.get('bt_speaker', 'disconnect_command'), shell=True).communicate() | |
if __name__ == '__main__': | |
GPIO.setmode(GPIO.BCM) # BCM | |
channel = 16 | |
btt = DBusThread() | |
btt.start() | |
btt.ready.wait() | |
try: | |
GPIO.setup(channel, GPIO.IN, pull_up_down=GPIO.PUD_UP) | |
while True: | |
val = GPIO.input(channel) | |
if val==1: | |
print('BT') | |
#setup BT | |
subprocess.Popen('service snapclient stop', shell=True).communicate() | |
btt.resume() | |
else: | |
print('snapcast') | |
#setup snapcast | |
btt.pause() | |
subprocess.Popen('service snapclient start', shell=True).communicate() | |
GPIO.wait_for_edge(channel, GPIO.BOTH) | |
except KeyboardInterrupt: | |
print('KeyboardInterrupt') | |
except Exception as e: | |
logging.exception("Stopping BTtoggle...") | |
btt.kill() | |
GPIO.cleanup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment