Skip to content

Instantly share code, notes, and snippets.

@r10r
Created May 2, 2013 09:19
Show Gist options
  • Save r10r/5501136 to your computer and use it in GitHub Desktop.
Save r10r/5501136 to your computer and use it in GitHub Desktop.
Listen for pulse sink (device) state changes/updates.
import dbus
import os
from subprocess import call
def pulse_bus_address():
if 'PULSE_DBUS_SERVER' in os.environ:
address = os.environ['PULSE_DBUS_SERVER']
else:
bus = dbus.SessionBus()
server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1")
address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties")
print(address)
return address
# setup the glib mainloop
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
import gobject
loop = gobject.MainLoop()
pulse_bus = dbus.connection.Connection(pulse_bus_address())
#source1 = conn.get_object(object_path='/org/pulseaudio/core1/source6')
#source1_status = source1.Get('org.PulseAudio.Core1.Device', 'State', dbus_interface='org.freedesktop.DBus.Properties')
#print(source1_status)
pulse_core = pulse_bus.get_object(object_path='/org/pulseaudio/core1')
# pulse_core.ListenForSignal('org.PulseAudio.Core1.Device.StateUpdated', dbus.Array(signature='o'))
device_path = dbus.ObjectPath('/org/pulseaudio/core1/source6')
pulse_core.ListenForSignal('org.PulseAudio.Core1.Device.StateUpdated', [device_path])
DEVICE_RUNNING = 0
DEVICE_IDLE = 1
DEVICE_SUSPENDED = 2
def sig_handler(state):
print("State changed to %s" % state)
if state == DEVICE_RUNNING:
print("Device is now running.")
elif state == DEVICE_IDLE:
print("Device is now idle.")
elif state == DEVICE_SUSPENDED:
print("Device is now suspended")
# os.system("some_command with args")
pulse_bus.add_signal_receiver(sig_handler, 'StateUpdated')
loop.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment