Last active
November 12, 2019 17:14
-
-
Save devilholk/c3c625df79351ed77b2c077c4ee8646b to your computer and use it in GitHub Desktop.
Pulseaudio volume balance monitor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
#encoding=utf-8 | |
import subprocess, os, sys, re, signal, threading, gi | |
from collections import defaultdict | |
gi.require_version('Notify', '0.7') | |
from gi.repository import Notify | |
Notify.init("Pulseaudio volume balance monitor") | |
#Volumes | |
volume_pattern = re.compile(r'^\s*volume:\s+(.*)', re.I) | |
sink_id_pattern = re.compile(r'^\s*sink\s+#(.*)', re.I) | |
description_pattern = re.compile(r'^\s*description:\s+(.*)', re.I) | |
volume_entry_pattern = re.compile(r'(^|\s+)(?P<name>.*?):\s+(?P<volume_raw>.*?)\s*/\s*(?P<volume_percent>.*?)\%\s*/\s*(?P<volume_db>.*?)\s+db', re.I) | |
#Events | |
check_volumes_ev = threading.Event() | |
check_complete_ev = threading.Event() | |
exit_event_handler_ev = threading.Event() | |
#Processes | |
subscription_process = None | |
def present_differing_channels(sink_list): | |
info_rows = ['The following sinks were detected to be unbalanced:'] | |
for entry in sink_list: | |
sink_id = entry['sink'] | |
description = entry['description'] | |
differences = entry['differing_channels'] | |
dfl = ', '.join(sorted(differences)) | |
info_rows.append(f'\tSink #{sink_id} ({description}):\t{dfl}') | |
formatted_notification = '\n'.join(info_rows) | |
Notify.Notification.new("Unbalanced volume detected", formatted_notification, "dialog-information").show() | |
def run_command(*cmd): | |
mod_env = dict(**os.environ) | |
mod_env['LANG'] = 'C' | |
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,env=mod_env) | |
raw_stdout, dummy = p.communicate() | |
stdout = raw_stdout.decode(sys.getdefaultencoding()) | |
return stdout | |
def parse_volume(line): | |
return [match.groupdict() for match in volume_entry_pattern.finditer(line)] | |
def get_volumes(): | |
result = defaultdict(dict) | |
current_sink_id = None | |
data = run_command('pactl', 'list', 'sinks') | |
try: | |
for line in data.split('\n'): | |
v_match = volume_pattern.match(line) | |
s_match = sink_id_pattern.match(line) | |
d_match = description_pattern.match(line) | |
if s_match: | |
current_sink_id = int(s_match.group(1)) | |
if v_match: | |
assert current_sink_id != None, 'Found volume before a current sink id!' | |
result[current_sink_id]['volume'] = parse_volume(v_match.group(1)) | |
if d_match: | |
assert current_sink_id != None, 'Found description before a current sink id!' | |
result[current_sink_id]['description'] = d_match.group(1) | |
except AssertionError as e: | |
print(e, file=sys.stderr) | |
print(data, file=sys.stderr) | |
raise | |
return result | |
def check_for_balance(): | |
current_volumes = get_volumes() | |
unbalanced_sinks = list() | |
for sink, data in current_volumes.items(): | |
current_volume = None | |
current_channel = None | |
differing_channels = set() | |
for entry in data['volume']: | |
if current_volume == None: | |
current_volume = entry['volume_raw'] | |
current_channel = entry['name'] | |
if entry['volume_raw'] != current_volume: | |
differing_channels |= {current_channel, entry['name']} | |
if differing_channels: | |
unbalanced_sinks.append(dict( | |
sink=sink, | |
description=data['description'], | |
differing_channels=differing_channels, | |
)) | |
if unbalanced_sinks: | |
present_differing_channels(unbalanced_sinks) | |
def volume_check_event_loop(): | |
while True: | |
check_volumes_ev.wait() | |
if exit_event_handler_ev.is_set(): | |
exit_event_handler_ev.clear() | |
return | |
check_for_balance() | |
check_volumes_ev.clear() | |
check_complete_ev.set() | |
def pulse_subscription_event_loop(): | |
global subscription_process | |
cmd = ['pactl', 'subscribe'] | |
subscription_process = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE) #line buffered | |
check_complete_ev.set() #Let's start by being in state complete | |
for line in subscription_process.stdout: | |
if check_complete_ev.is_set(): #Make sure we are ready to check volumes | |
check_volumes_ev.set() | |
def break_signal_handler(sig, frame): | |
exit_event_handler_ev.set() | |
check_volumes_ev.set() | |
if subscription_process: #Not necersary sicne the entire processgroup will receive sigkill but I'll add it any way because I am unsure about these things | |
subscription_process.kill() | |
#Install signal handler for killing event thread and subscription process | |
signal.signal(signal.SIGINT, break_signal_handler) | |
#Run event loop | |
volume_event_thread = threading.Thread(target=volume_check_event_loop) | |
volume_event_thread.start() | |
pulse_subscription_event_loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment