Skip to content

Instantly share code, notes, and snippets.

@jbg
Created October 12, 2017 07:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jbg/a0e301e769c31f7a7abcd45c7f284046 to your computer and use it in GitHub Desktop.
Save jbg/a0e301e769c31f7a7abcd45c7f284046 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
from collections import OrderedDict
from datetime import datetime
from functools import partial
import json
import logging as log
import os
from queue import Queue, Empty
import subprocess
import sys
import threading
import time
import netifaces
from netifaces import AF_INET, AF_INET6
import pulsectl
from pytz import timezone, utc
WIRELESS_IFNAME="wlp4s0"
BATTERY = "BAT0"
MINOR_REFRESH_TIMEOUT = 10.0
MAJOR_REFRESH_COUNTER = 3
home = os.environ.get("HOME")
log.basicConfig(filename=os.path.join(home, ".local", "share", "statusbar", "log"),
level=log.INFO)
handles = {}
def fp(filename):
if filename not in handles:
handles[filename] = open(filename, "r")
return handles[filename]
def read(filename):
f = fp(filename)
f.seek(0)
return f.read().strip()
def block(**kwargs):
if "separator_block_width" not in kwargs:
kwargs["separator_block_width"] = 55
return kwargs
def dict_from_command(command, separator="="):
proc = subprocess.run(command, stdout=subprocess.PIPE)
output = proc.stdout.decode("utf-8")
return {bits[0].strip(): bits[1].strip()
for bits in [line.split(separator, 1)
for line in output.split("\n")
if separator in line]}
def wireless_block():
wpa = dict_from_command(["/usr/bin/wpa_cli", "-i", WIRELESS_IFNAME, "status"])
iw = dict_from_command(["/usr/bin/iw", "dev", WIRELESS_IFNAME, "link"], separator=": ")
wpa_state = wpa.get("wpa_state")
if wpa_state == "COMPLETED":
return block(color="#00ff00",
full_text="📶 ssid:%s rate:%s freq:%s wpa:%s/%s/%s" % (iw.get("SSID", "none"),
iw.get("tx bitrate", "").split(" ", 1)[0],
iw.get("freq", "scanning"),
wpa.get("key_mgmt"),
wpa.get("group_cipher"),
wpa.get("pairwise_cipher")))
else:
return block(color="#ff0000", full_text="📶 %s" % wpa_state)
def default_gateway_block():
gateways = netifaces.gateways()
color = "#00ff00"
try:
v4 = ",".join(gateway[1] for gateway in gateways[AF_INET])
except KeyError:
v4 = "none"
color = "#ff0000"
try:
v6 = ",".join(gateway[1] for gateway in gateways[AF_INET6])
except KeyError:
v6 = "none"
color = "#ff0000"
return block(full_text="📡 v4:%s v6:%s" % (v4, v6), color=color)
def microphone_block():
with pulsectl.Pulse("bar-mic-check") as pulse:
sources = [source for source in pulse.source_list() if not source.name.endswith(".monitor")]
if not all(source.mute for source in sources):
return block(name="mic", full_text="🎤 LISTENING", color="#ff0000")
else:
return block(name="mic", full_text="🎤 DISABLED", color="#00ff00")
def toggle_microphone():
with pulsectl.Pulse("bar-mic-toggle") as pulse:
sources = [source for source in pulse.source_list() if not source.name.endswith(".monitor")]
muted = all(source.mute for source in sources)
for source in sources:
pulse.mute(source, 0 if muted else 1)
def battery_block():
try:
base = "/sys/class/power_supply/" + BATTERY
state = read(base + "/status")
color = None
try:
charge_now = read(base + "/charge_now")
charge_full = read(base + "/charge_full")
charge = int(int(charge_now) / int(charge_full) * 100)
except ValueError:
charge = "?"
else:
if state == "Charging" or charge > 80:
color = "#00ff00"
elif charge < 20:
color = "#ff0000"
return block(full_text="🔋 %s %s%%" % (state, charge), color=color)
except FileNotFoundError:
return block(full_text="🔋 MISSING", color="#ff0000")
# This finds the unicode clock character that looks closest to the current time
def unicode_char_for_datetime(dt):
hourish = (dt.hour if dt.hour != 0 else 12) + (1 if dt.minute >= 45 else 0)
while hourish > 12:
hourish -= 12
offset = 12 if dt.minute > 15 and dt.minute < 45 else 0
return chr(128335 + hourish + offset)
def time_block(prefix, tz, short=False):
dt = utc.localize(datetime.utcnow()).astimezone(tz)
symbol = unicode_char_for_datetime(dt)
short_text = "%s %s %s" % (symbol, prefix, dt.strftime("%H:%M"))
if short:
return block(full_text=short_text)
else:
return block(full_text="%s %s %s" % (symbol, prefix, dt.strftime("%a %Y-%m-%d %H:%M")),
short_text=short_text)
europe_time_block = partial(time_block, prefix="EU", tz=timezone("Europe/Berlin"))
europe_time_block.__name__ = "europe_time_block"
shanghai_time_block = partial(time_block, prefix="CN", tz=timezone("Asia/Shanghai"), short=True)
shanghai_time_block.__name__ = "shanghai_time_block"
auckland_time_block = partial(time_block, prefix="NZ", tz=timezone("Pacific/Auckland"), short=True)
auckland_time_block.__name__ = "auckland_time_block"
queue = Queue()
def output_loop():
try:
json.dump({"version": 1, "click_events": True}, sys.stdout)
sys.stdout.write("\n[")
tasks = OrderedDict([(wireless_block, None),
(default_gateway_block, None),
(microphone_block, None),
(battery_block, None),
(auckland_time_block, None),
(shanghai_time_block, None),
(europe_time_block, None)])
refresh = "major"
counter = MAJOR_REFRESH_COUNTER
# maintain pytz!
try:
while True:
if refresh == "minor":
for task in (europe_time_block, shanghai_time_block, auckland_time_block):
tasks[task] = task()
elif refresh == "major":
for task in tasks:
tasks[task] = task()
else:
task = next(task for task in tasks if task.__name__ == refresh)
tasks[task] = task()
json.dump(list(tasks.values()), sys.stdout)
sys.stdout.write(",\n")
sys.stdout.flush()
try:
refresh = queue.get(timeout=MINOR_REFRESH_TIMEOUT)
except Empty:
if counter > 0:
refresh = "minor"
counter -= 1
else:
refresh = "major"
counter = MAJOR_REFRESH_COUNTER
except KeyboardInterrupt:
sys.stdout.write("[]")
raise
finally:
for key, handle in handles.items():
try:
handle.close()
except:
log.exception("closing handle %s", key)
sys.stdout.write("]")
sys.stdout.flush()
except KeyboardInterrupt:
raise
except:
log.exception("exception in output loop")
def input_loop():
try:
while True:
line = sys.stdin.readline().strip()
if line == "[":
continue
if line == "]":
break
try:
command = json.loads(line[:-1])
except:
log.exception("exception decoding event JSON")
else:
if command["name"] == "mic":
toggle_microphone()
queue.put("microphone_block")
else:
queue.put("major")
except:
log.exception("exception in input loop")
def external_trigger_loop():
# TODO
# Watch battery status, wireless status, pulseaudio status, gateway status
# for changes and trigger updates of the appropriate blocks
pass
if __name__ == "__main__":
threading.Thread(target=input_loop, name="input").start()
#threading.Thread(target=external_trigger_loop, name="external").start()
output_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment