Skip to content

Instantly share code, notes, and snippets.

@devilholk
Last active November 12, 2019 17:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devilholk/c3c625df79351ed77b2c077c4ee8646b to your computer and use it in GitHub Desktop.
Save devilholk/c3c625df79351ed77b2c077c4ee8646b to your computer and use it in GitHub Desktop.
Pulseaudio volume balance monitor
#!/usr/bin/env python3
#encoding=utf-8
import subprocess, os, sys, re, signal, threading, gi
from collections import defaultdict
gi.require_version('Notify', '0.7')
from gi.repository import Notify
Notify.init("Pulseaudio volume balance monitor")
#Volumes
volume_pattern = re.compile(r'^\s*volume:\s+(.*)', re.I)
sink_id_pattern = re.compile(r'^\s*sink\s+#(.*)', re.I)
description_pattern = re.compile(r'^\s*description:\s+(.*)', re.I)
volume_entry_pattern = re.compile(r'(^|\s+)(?P<name>.*?):\s+(?P<volume_raw>.*?)\s*/\s*(?P<volume_percent>.*?)\%\s*/\s*(?P<volume_db>.*?)\s+db', re.I)
#Events
check_volumes_ev = threading.Event()
check_complete_ev = threading.Event()
exit_event_handler_ev = threading.Event()
#Processes
subscription_process = None
def present_differing_channels(sink_list):
info_rows = ['The following sinks were detected to be unbalanced:']
for entry in sink_list:
sink_id = entry['sink']
description = entry['description']
differences = entry['differing_channels']
dfl = ', '.join(sorted(differences))
info_rows.append(f'\tSink #{sink_id} ({description}):\t{dfl}')
formatted_notification = '\n'.join(info_rows)
Notify.Notification.new("Unbalanced volume detected", formatted_notification, "dialog-information").show()
def run_command(*cmd):
mod_env = dict(**os.environ)
mod_env['LANG'] = 'C'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,env=mod_env)
raw_stdout, dummy = p.communicate()
stdout = raw_stdout.decode(sys.getdefaultencoding())
return stdout
def parse_volume(line):
return [match.groupdict() for match in volume_entry_pattern.finditer(line)]
def get_volumes():
result = defaultdict(dict)
current_sink_id = None
data = run_command('pactl', 'list', 'sinks')
try:
for line in data.split('\n'):
v_match = volume_pattern.match(line)
s_match = sink_id_pattern.match(line)
d_match = description_pattern.match(line)
if s_match:
current_sink_id = int(s_match.group(1))
if v_match:
assert current_sink_id != None, 'Found volume before a current sink id!'
result[current_sink_id]['volume'] = parse_volume(v_match.group(1))
if d_match:
assert current_sink_id != None, 'Found description before a current sink id!'
result[current_sink_id]['description'] = d_match.group(1)
except AssertionError as e:
print(e, file=sys.stderr)
print(data, file=sys.stderr)
raise
return result
def check_for_balance():
current_volumes = get_volumes()
unbalanced_sinks = list()
for sink, data in current_volumes.items():
current_volume = None
current_channel = None
differing_channels = set()
for entry in data['volume']:
if current_volume == None:
current_volume = entry['volume_raw']
current_channel = entry['name']
if entry['volume_raw'] != current_volume:
differing_channels |= {current_channel, entry['name']}
if differing_channels:
unbalanced_sinks.append(dict(
sink=sink,
description=data['description'],
differing_channels=differing_channels,
))
if unbalanced_sinks:
present_differing_channels(unbalanced_sinks)
def volume_check_event_loop():
while True:
check_volumes_ev.wait()
if exit_event_handler_ev.is_set():
exit_event_handler_ev.clear()
return
check_for_balance()
check_volumes_ev.clear()
check_complete_ev.set()
def pulse_subscription_event_loop():
global subscription_process
cmd = ['pactl', 'subscribe']
subscription_process = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE) #line buffered
check_complete_ev.set() #Let's start by being in state complete
for line in subscription_process.stdout:
if check_complete_ev.is_set(): #Make sure we are ready to check volumes
check_volumes_ev.set()
def break_signal_handler(sig, frame):
exit_event_handler_ev.set()
check_volumes_ev.set()
if subscription_process: #Not necersary sicne the entire processgroup will receive sigkill but I'll add it any way because I am unsure about these things
subscription_process.kill()
#Install signal handler for killing event thread and subscription process
signal.signal(signal.SIGINT, break_signal_handler)
#Run event loop
volume_event_thread = threading.Thread(target=volume_check_event_loop)
volume_event_thread.start()
pulse_subscription_event_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment