Skip to content

Instantly share code, notes, and snippets.

@EarlGray
Last active July 7, 2023 22:48
Show Gist options
  • Save EarlGray/5e7ccad00772f61c3843b39f0d1175d2 to your computer and use it in GitHub Desktop.
Save EarlGray/5e7ccad00772f61c3843b39f0d1175d2 to your computer and use it in GitHub Desktop.
dmytrish's swaybar
#!/usr/bin/env python3
"""
A swaybar implementation.
Current plugins:
- keyboard layout state and tracking
- PulseAudio-compatible sound: device, volume and mute status
- power supply: on battery/ac, percentage, time to emtpy/full
- network: the name of the primary connection, `nmmenu` on click
- time in different formats
Use `status -a` to show all.
Dependencies:
- dbus_next
This script logs to `/tmp/swaybar.log` by default, use `SWAYBAR_LOG=/your/path`
to override.
"""
import os
import re
import sys
import json
import subprocess
import logging
import argparse
import traceback
import dataclasses
from enum import Enum
from datetime import datetime
from pathlib import Path
from collections import namedtuple
from dataclasses import dataclass
from typing import (Dict, Self, Union)
import asyncio
from asyncio import Queue
from asyncio.exceptions import CancelledError
from dbus_next import BusType
from dbus_next.signature import Variant
from dbus_next.aio import MessageBus
from dbus_next.aio.proxy_object import ProxyObject, ProxyInterface
# TODO: replace swaymsg invocations with sway ipc (`man sway-ipc`)
class Mouse(int, Enum):
""" sway mouse buttons """
BTN_LEFT = 1
BTN_MIDDLE = 2
BTN_RIGHT = 3
WHEEL_UP = 4
WHEEL_DOWN = 5
@dataclass
class ClickEvent:
""" swaybar-protocol click event """
name: str
button: Mouse
event: str
x: int
y: int
relative_x: int
relative_y: int
width: int
height: int
scale: float
@classmethod
def from_dict(cls, d) -> Self:
if not hasattr(cls, '_fields'):
cls._fields = set(f.name for f in dataclasses.fields(cls))
kv = { k: v for k, v in d.items() if k in cls._fields }
return cls(**kv)
SYSTEM_BUS: MessageBus = None
SESSION_BUS: MessageBus = None
async def dbus_system_bus() -> MessageBus:
global SYSTEM_BUS
if not SYSTEM_BUS:
SYSTEM_BUS = await MessageBus(bus_type=BusType.SYSTEM).connect()
return SYSTEM_BUS
async def dbus_session_bus() -> MessageBus:
global SESSION_BUS
if not SESSION_BUS:
SESSION_BUS = await MessageBus(bus_type=BusType.SESSION).connect()
return SESSION_BUS
class Tracker:
NAME: str = '???'
DBUS_IF_PROPERTIES: str = 'org.freedesktop.DBus.Properties'
def __init__(self):
self.task = None
def start(self, eventq: Queue):
self.task = asyncio.create_task(self._run(eventq))
return self.task
async def _run(self, eventq):
try:
await self.run(eventq)
except CancelledError:
raise
except Exception as e:
logging.error(f'{self.NAME}: {e}')
raise
finally:
logging.info(f'{self.NAME}: exited')
await eventq.put({self.NAME: f'{self.NAME}: ---'})
def stop(self):
if not self.task.done():
self.task.cancel()
return self.task.exception() or self.task.result()
def on_click(self, event: ClickEvent):
logging.info(f'{self.NAME}.on_click: {event}')
@staticmethod
async def run_subprocess(*cmd):
proc = await asyncio.create_subprocess_exec(*cmd)
await proc.wait()
@staticmethod
async def dbus_proxy(svc: str, path: str, iface: str) -> ProxyInterface:
bus = await dbus_system_bus()
objxml = await bus.introspect(svc, path);
obj = bus.get_proxy_object(svc, path, objxml)
return obj.get_interface(iface)
class TaskKeyboard(Tracker):
NAME = 'kbd'
KNOWN_LAYOUTS = {
'English (US)': 'en',
'Ukrainian': '🇺🇦',
}
@staticmethod
def shorten_layout(name):
return TaskKeyboard.KNOWN_LAYOUTS.get(name, name)
@staticmethod
def get_inputs():
inputs = subprocess.check_output('swaymsg -t get_inputs'.split())
inputs = inputs.decode('utf-8')
return json.loads(inputs)
def __init__(self):
super().__init__()
state = {} # input_name => layout
inputs = TaskKeyboard.get_inputs()
for inp in inputs:
if not (inp['type'] == 'keyboard' and 'keyboard' in inp['name']):
continue
state[inp['identifier']] = inp['xkb_active_layout_name']
self.state = state
def _format(self):
return 'kbd: ' + ','.join(self.shorten_layout(lt) for lt in self.state.values())
async def run(self, eventq: Queue):
try:
await eventq.put({'kbd': self._format()})
proc = await asyncio.create_subprocess_exec(
*'swaymsg -t subscribe -m ["input"]'.split(),
stdout=subprocess.PIPE,
)
while proc.returncode is None:
output = await proc.stdout.readline()
output = output.decode('utf-8')
event = json.loads(output)
if event['change'] != 'xkb_layout':
continue
input = event['input']
kbd = input['identifier']
if kbd in self.state:
active_layout = input['xkb_active_layout_name']
self.state[kbd] = active_layout
await eventq.put({'kbd': self._format()})
logging.info(f'{self.NAME}: swaymsg exited ({proc.returncode})')
finally:
await eventq.put({'kbd': 'kbd: --'})
if proc.returncode is None:
proc.terminate()
def on_click(self, event: ClickEvent):
for kbd in self.state:
cmd = ['swaymsg', 'input', kbd, 'xkb_switch_layout', 'next']
logging.info(f'{self.NAME}: {cmd}')
subprocess.run(cmd, capture_output=True)
logging.info(f'{self.NAME}.on_click: {event}')
class TaskTime(Tracker):
NAME = 'time'
FORMATS = [
"%a %b %d, %T",
"%b %d, %T",
"%b %d, %H:%M",
"%T",
]
def __init__(self):
self._format = 0
def _tick(self):
fmt = self.FORMATS[self._format]
time = datetime.now().strftime(fmt)
return {self.NAME: time}
async def run(self, eventq: Queue):
self.eventq = eventq
while True:
await self.eventq.put(self._tick())
usec = datetime.now().microsecond
await asyncio.sleep(1 - usec/1000000)
def on_click(self, event: ClickEvent):
self._format = (self._format + 1) % len(self.FORMATS)
logging.info(f'{self.NAME}.on_click: {self._format=}')
try:
self.eventq.put_nowait(self._tick())
except asyncio.QueueFull:
pass
class TaskSound(Tracker):
""" Check and track sound devices/volume/mutes via PulseAudio interfaces
"""
NAME = 'sound'
# TODO: use `org.pulseaudio.Server` on DBus
# but: pipewire/pipewire-pulse has no support for dbus at the moment.
# rewrite this with `import pulsectl` instead?
# I use these, add yours if needed.
KNOWN_SINKS = {
'hdmi-stereo': 'hdmi',
'analog-stereo': 'spkr',
'a2dp-sink': 'bluz',
}
@staticmethod
def pactl_sinks():
output = subprocess.check_output('pactl -f json list sinks'.split())
output = output.decode('utf-8')
return json.loads(output)
@classmethod
def sound_info(cls):
try:
info = subprocess.check_output('pactl -f json info'.split())
info = json.loads(info.decode('utf-8'))
snddev = info['default_sink_name']
sinks = TaskSound.pactl_sinks()
sinks = { s['name']: s for s in sinks }
sink = sinks[snddev]
for s, sh in TaskSound.KNOWN_SINKS.items():
if s in snddev:
snddev = sh
break
logging.debug(f'{cls.__name__}: sink = {json.dumps(sink)}')
lperc = sink['volume']['front-left']['value_percent']
rperc = sink['volume']['front-right']['value_percent']
vol = f'{snddev}: {lperc}'
if lperc != rperc:
vol += '/{rperc}'
if sink['mute']:
vol += ' \U0001F507' # "muted speaker"
return vol
except Exception as e:
logging.error(f'task_sound: {str(e)}')
return 'snd: ???'
async def run(self, eventq: Queue):
event_re = re.compile(r"Event '(?P<event>.*)' on sink #(?P<id>\d+)")
# initial value
volume = self.sound_info()
await eventq.put({'sound': volume})
try:
proc = await asyncio.create_subprocess_exec(
# TODO: pactl -f json subscribe ?
'pactl', 'subscribe',
stdout=subprocess.PIPE,
)
while proc.returncode is None:
output = await proc.stdout.readline()
output = output.decode('utf-8')
m = event_re.match(output)
if m:
volume = self.sound_info()
await eventq.put({'sound': volume})
logging.info(f'pactl exited ({proc.returncode})')
finally:
if proc.returncode is None:
proc.terminate()
def on_click(self, event: ClickEvent):
logging.info(f'sound.on_click: {event.button}')
if event.button == Mouse.BTN_RIGHT:
asyncio.create_task(
self.run_subprocess('pavucontrol'),
)
elif event.button == Mouse.BTN_MIDDLE:
cmd = "pactl set-sink-mute 0 toggle"
subprocess.run(cmd.split(), capture_output=True)
logging.info(cmd)
elif event.button == Mouse.WHEEL_UP:
cmd = 'pactl set-sink-volume 0 +5%'
proc = subprocess.run(cmd.split(), capture_output=True)
logging.info(f'{cmd} => {proc}')
elif event.button == Mouse.WHEEL_DOWN:
cmd = 'pactl set-sink-volume 0 -5%'
subprocess.run(cmd.split(), capture_output=True)
class TaskNetwork(Tracker):
"""Display short networking info
- `net: ---` if disabled
- `<type>: <name|ip4|ip6|...>` for primary connection
On right click: run `nmmenu`
"""
NAME = 'net'
# TODO: use `org.freedesktop.NetworkMonitor` on DBus
# o.f.NM /o/f/NM o.f.NM.NetworkingEnabled
# o.f.NM /o/f/NM o.f.NM.Connectivity
# o.f.NM /o/f/NM o.f.NM.PrimaryConnection -> /o/f/NM/ActiveConnection/<N>
# o.f.NM /o/f/NM o.f.NM.PrimaryConnectionType
# o.f.NM /o/f/NM/ActiveConnection/<N> o.f.NM.Connection.Active.Ip4Config
# \> o.f.NM /o/f/NM/IP4Config/<C> o.f.NM.IP4Config.AddressData
# o.f.NM /o/f/NM/ActiveConnection/<N> o.f.NM.Connection.Active.Ip6Config
# \> o.f.NM /o/f/NM/IP6Config/<C> o.f.NM.IP6Config.AddressData
# Subscribe to /o/f/NM o.f.NM.Connection.StateChanged
@staticmethod
def network_info() -> str:
nmcli_output = subprocess.check_output(
'nmcli --terse con show --active'.split(),
).decode('utf-8')
networks = []
for network in nmcli_output.splitlines():
name, id, typ, iface = network.split(":")
if typ in ["802-11-wireless"]:
return f'wifi: {name}'
if typ == 'loopback':
continue
net = {
'name': name,
'id': id,
'type': typ,
'iface': iface,
'routes': [],
'gateway': '',
'dns': '',
}
networks.append(net)
# TODO: get routes and ips via
# nmcli -t con show $names | grep -Ei '^(ip4|ip6|connection.id)'
return f'{len(networks)} net'
async def run(self, eventq: Queue):
try:
proc = await asyncio.create_subprocess_exec(
*'nmcli monitor'.split(),
stdout=subprocess.PIPE,
)
# initial information
info = TaskNetwork.network_info()
await eventq.put({'net': info})
while proc.returncode is None:
output = await proc.stdout.readline()
output = output.decode('utf-8')
info = TaskNetwork.network_info()
await eventq.put({'net': info})
logging.info('`nmcli monitor` exited')
finally:
if proc.returncode is None:
proc.terminate()
def on_click(self, event: ClickEvent):
if event.button == Mouse.BTN_RIGHT:
asyncio.create_task(
self.run_subprocess(*'nmmenu --wrap -pnet'.split()),
)
class TaskPower(Tracker):
"""Display power supplies
- `ac: 100%` when plugged in and charged
- `ac: XX%, <time to full>` when plugged in and charging
- `bat0: XX%, <time to empty>` when on battery
Text color changes to reflect the active power profile:
- `balanced`: default color
- `power-saver`: green text
- `performance`: yellow text
"""
NAME = 'power'
DBUS_PWRPROF_SVC = 'net.hadess.PowerProfiles'
DBUS_PWRPROF_PATH = '/net/hadess/PowerProfiles'
DBUS_UPOWER_SVC = 'org.freedesktop.UPower'
DBUS_UPOWER_PATH = '/org/freedesktop/UPower'
DBUS_UPOWER_DEV_IF = 'org.freedesktop.UPower.Device'
# TODO: discover these:
DBUS_DEV_BAT0 = '/org/freedesktop/UPower/devices/battery_BAT0'
DBUS_DEV_AC = '/org/freedesktop/UPower/devices/line_power_AC'
async def get_powerprofile(self) -> str:
if not hasattr(self, 'dbus_pwrprof'):
self.dbus_pwrprof = await self.dbus_proxy(
self.DBUS_PWRPROF_SVC,
self.DBUS_PWRPROF_PATH,
self.DBUS_PWRPROF_SVC,
)
profile = await self.dbus_pwrprof.get_active_profile()
return profile
async def _get_upower_bat0(self) -> ProxyInterface:
if not hasattr(self, 'dbus_bat0'):
self.dbus_bat0 = await self.dbus_proxy(
self.DBUS_UPOWER_SVC,
self.DBUS_DEV_BAT0,
self.DBUS_UPOWER_DEV_IF,
)
return self.dbus_bat0
async def acpi_info(self) -> Union[str, Dict[str, str]]:
bat0 = await self._get_upower_bat0()
energy, energy_full, time_empty, time_full, profile = await asyncio.gather(
bat0.get_energy(),
bat0.get_energy_full(),
bat0.get_time_to_empty(),
bat0.get_time_to_full(),
self.get_powerprofile(),
return_exceptions = True,
)
perc = int(100 * energy / energy_full)
if time_empty != 0:
discharging = True
time = int(time_empty) // 60
hours, mins = time / 60, time % 60
full_text = f'bat: {perc}%, {hours:.0f}:{mins:02.0f}'
elif time_full != 0:
discharging = False
time = int(time_full) // 60
hours, mins = time / 60, time % 60
full_text = f'ac: {perc}%, {hours:.0f}:{mins:02.0f}'
else:
discharging = False
full_text = f'ac: {perc}%'
ret = { 'full_text': full_text }
if profile == 'balanced':
pass
elif profile == 'power-saver':
ret['color'] = '#00ff00'
elif profile == 'performance':
ret['color'] = '#ffa000'
else:
ret['background'] = '#ff0000'
return ret
async def run(self, eventq: Queue):
sync = asyncio.Event()
#def on_changed(interface_name, changed_properties, invalidated_properties):
# """ debug callback """
# logging.warning(f'UPower.Changed {interface_name=}, {changed_properties=}, {invalidated_properties=}')
def on_upower_changed(interface_name, changed_properties, invalidated_properties):
on_battery = changed_properties.get('OnBattery')
if on_battery is not None:
logging.info(f'UPower.OnBattery = {on_battery.value}')
sync.set()
def on_battery_update(interface_name, changed_properties, invalidated_properties):
#state = changed_properties.get('State')
tfull = changed_properties.get('TimeToFull')
tempty = changed_properties.get('TimeToEmpty')
if tfull or tempty:
sync.set()
upower = await self.dbus_proxy(self.DBUS_UPOWER_SVC, self.DBUS_UPOWER_PATH, self.DBUS_IF_PROPERTIES)
upower.on_properties_changed(on_upower_changed)
bat0 = await self.dbus_proxy(self.DBUS_UPOWER_SVC, self.DBUS_DEV_BAT0, self.DBUS_IF_PROPERTIES)
bat0.on_properties_changed(on_battery_update)
pwrprof = await self.dbus_proxy(self.DBUS_PWRPROF_SVC, self.DBUS_PWRPROF_PATH, self.DBUS_IF_PROPERTIES)
pwrprof.on_properties_changed(lambda ifname, chnged, invldted: sync.set())
while True:
info = await self.acpi_info()
await eventq.put({'power': info})
await sync.wait()
sync.clear()
#def on_click(self, event: ClickEvent):
# TODO: change power profile
def setup_input(eventq: Queue):
import fileinput
from threading import Thread
loop = asyncio.get_event_loop()
def on_input(line):
loop.create_task(eventq.put({'input': line}))
def thread_input(on_input):
try:
for line in fileinput.input(files='-'):
line = line.strip().strip(',')
if line == '[': continue
loop.call_soon_threadsafe(on_input, line)
except Exception as e:
subprocess.call(["notify-send", "thread_input: ERROR " + str(e)])
t = Thread(target=thread_input, args=[on_input])
t.start()
async def task_main(tasks: Dict[str, Tracker]):
# https://man.archlinux.org/man/swaybar-protocol.7
state = {}
eventq = Queue(maxsize=1)
try:
setup_input(eventq)
for t in tasks.values():
state[t.NAME] = ''
t.start(eventq)
print(json.dumps({"version": 1, "click_events": True}))
print('[')
while True:
event = await eventq.get()
if 'input' in event:
inp = event.get('input')
#logging.info(f'click: {inp}')
try:
inp = json.loads(inp)
ev = ClickEvent.from_dict(inp)
tasks[ev.name].on_click(ev)
except Exception as e:
logging.error(f'input parse error: {e}')
else:
state.update(event)
# output
blocks = []
for name, value in state.items():
output = {'name': name}
if isinstance(value, str):
full_text = value
elif isinstance(value, dict):
output.update(value)
full_text = value['full_text']
output['full_text'] = ' '+full_text+' '
blocks.append(output)
print(json.dumps(blocks), end=",\n")
sys.stdout.flush()
except KeyboardInterrupt:
pass
except Exception as e:
logging.error(f'task_main: {e}')
for t in tasks.values():
ret = t.stop()
if ret:
logging.warn(f'{t.NAME}: {ret}')
def flags():
flags = argparse.ArgumentParser(
'swaytus',
description='A status bar implementation for Sway',
)
flags.add_argument(
'-a', '--all', action='store_true',
help='Show everything',
)
flags.add_argument(
'-t', '--time', action='store_true',
help='Show date and time',
)
flags.add_argument(
'-n', '--net', action='store_true',
help='Show network status',
)
flags.add_argument(
'-k', '--kbd', action='store_true',
help='Show and track keyboard layouts',
)
flags.add_argument(
'-s', '--sound', action='store_true',
help='Show and track sound volume',
)
flags.add_argument(
'-p', '--power', action='store_true',
help='Show and track power (battery level, etc)',
)
return flags
def main():
args = flags().parse_args()
logfile = os.getenv('SWAYBAR_LOG') or "/tmp/swaybar.log"
logging.basicConfig(
format='%(asctime)s\t%(levelname)s\t%(module)s:%(message)s',
level=logging.INFO,
filename=logfile,
)
trackers = {}
if args.kbd or args.all:
trackers[TaskKeyboard.NAME] = TaskKeyboard()
if args.sound or args.all:
trackers[TaskSound.NAME] = TaskSound()
if args.power or args.all:
trackers[TaskPower.NAME] = TaskPower()
if args.net or args.all:
trackers[TaskNetwork.NAME] = TaskNetwork()
if args.time or args.all:
trackers[TaskTime.NAME] = TaskTime()
asyncio.run(task_main(trackers), debug=True)
logging.info('oktnxbye')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment