Skip to content

Instantly share code, notes, and snippets.

@User65k
Created October 28, 2018 19:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save User65k/8b14149ca1b00dc9c8c1a8c5f552d8f6 to your computer and use it in GitHub Desktop.
Save User65k/8b14149ca1b00dc9c8c1a8c5f552d8f6 to your computer and use it in GitHub Desktop.
switch between bluetooth and snapclient on a RPi
#!/usr/bin/python2.7
#
# This File should be placed next to https://github.com/lukasjapan/bt-speaker
# It does not permanently block the alsa device and thus works with https://github.com/badaix/snapcast
#
# A GPIO of the Raspberry is used to determine if bluetooth or snapclient should run.
#
import RPi.GPIO as GPIO
from bt_speaker import AutoAcceptSingleAudioAgent, config
from bt_manager.media import BTMedia
from bt_manager.agent import BTAgentManager
from bt_manager.audio import SBCAudioSink
from bt_manager.device import BTDevice
from gi.repository import GLib
import dbus.mainloop.glib
import subprocess
import alsaaudio
from threading import Thread, Event
import logging
class ASink(SBCAudioSink):
def __init__(self, ):
SBCAudioSink.__init__(self, path='/endpoint/a2dpsink')
self.process = None
self.alsamixer = None
if config.getboolean('alsa', 'enabled'):
# Hook into alsa service for volume control
self.alsamixer = alsaaudio.Mixer(
control=config.get('alsa', 'mixer'),
id=int(config.get('alsa', 'id')),
cardindex=int(config.get('alsa', 'cardindex'))
)
def start(self):
self.startup()
def startup(self):
# Start process
try:
self.process = alsaaudio.PCM(device=config.get('alsa', 'device'))
self.process.setchannels(2)
self.process.setrate(44100)
self.process.setformat(alsaaudio.PCM_FORMAT_S16_LE)
self.process.setperiodsize(44100 / 8)
self.volume(64)
except Exception as e:
logging.exception("alsa startup error")
def stop(self):
self.process = None
def raw_audio(self, data):
if not self.process:
return
try:
self.process.write(data)
except Exception as e:
logging.exception("alsa write error")
def volume(self, new_volume):
if not config.getboolean('alsa', 'enabled'):
return
if not self.alsamixer:
return
# normalize volume
volume = float(new_volume) / 127.0
print("Volume changed to %i%%" % (volume * 100.0))
# alsamixer takes a percent value as integer from 0-100
self.alsamixer.setvolume(int(volume * 100.0))
class OnOffAgent(AutoAcceptSingleAudioAgent):
def __init__(self, connect_callback, disconnect_callback):
self.inhiding = Event()
AutoAcceptSingleAudioAgent.__init__(self, connect_callback, disconnect_callback)
def update_discoverable(self):
if not config.getboolean('bluez', 'discoverable'):
return
if bool(self.connected) or self.inhiding.is_set():
print("Hiding adapter from all devices.")
self.adapter.set_property('Discoverable', False)
else:
print("Showing adapter to all devices.")
self.adapter.set_property('Discoverable', True)
class DBusThread(Thread):
def __init__(self):
Thread.__init__(self)
self.ready = Event()
self.sink = None
self.media = None
self.agent = None
self.manager = None
def kill(self):
self.manager.unregister_agent(self.agent._path)
self.media.unregister_endpoint(self.sink._path)
self.mainloop.quit()
def resume(self):
self.agent.inhiding.clear()
self.agent.update_discoverable()
def pause(self):
self.agent.inhiding.set()
self.agent.update_discoverable()
self.sink.close_transport()
self.sink.volume(127) # all full as snapcast seems to be softvol?
self.sink.stop()
#try:
# if self.agent.connected:
# dev = BTDevice(dev_path=self.agent.connected)
# dev.disconnect() # DBusException: org.freedesktop.DBus.Error.UnknownMethod: Method "Disconnect" with signature "" on interface "org.bluez.Device" doesn't exist
#except Exception as e:
# logging.exception("Stopping BTtoggle...")
def run(self):
# Initialize the DBus SystemBus
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
# Mainloop for communication
self.mainloop = GLib.MainLoop()
# setup bluetooth configuration
self.sink = ASink()
self.media = BTMedia(config.get('bluez', 'device_path'))
self.media.register_endpoint(self.sink._path, self.sink.get_properties())
# setup bluetooth agent (that manages connections of devices)
self.agent = OnOffAgent(self._connect, self._disconnect)
self.manager = BTAgentManager()
self.manager.register_agent(self.agent._path, "NoInputNoOutput")
self.manager.request_default_agent(self.agent._path)
# Run
self.ready.set()
self.mainloop.run()
def _connect(self):
subprocess.Popen(config.get('bt_speaker', 'connect_command'), shell=True).communicate()
def _disconnect(self):
self.sink.close_transport()
subprocess.Popen(config.get('bt_speaker', 'disconnect_command'), shell=True).communicate()
if __name__ == '__main__':
GPIO.setmode(GPIO.BCM) # BCM
channel = 16
btt = DBusThread()
btt.start()
btt.ready.wait()
try:
GPIO.setup(channel, GPIO.IN, pull_up_down=GPIO.PUD_UP)
while True:
val = GPIO.input(channel)
if val==1:
print('BT')
#setup BT
subprocess.Popen('service snapclient stop', shell=True).communicate()
btt.resume()
else:
print('snapcast')
#setup snapcast
btt.pause()
subprocess.Popen('service snapclient start', shell=True).communicate()
GPIO.wait_for_edge(channel, GPIO.BOTH)
except KeyboardInterrupt:
print('KeyboardInterrupt')
except Exception as e:
logging.exception("Stopping BTtoggle...")
btt.kill()
GPIO.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment