Created
October 12, 2017 07:55
-
-
Save jbg/a0e301e769c31f7a7abcd45c7f284046 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from collections import OrderedDict | |
from datetime import datetime | |
from functools import partial | |
import json | |
import logging as log | |
import os | |
from queue import Queue, Empty | |
import subprocess | |
import sys | |
import threading | |
import time | |
import netifaces | |
from netifaces import AF_INET, AF_INET6 | |
import pulsectl | |
from pytz import timezone, utc | |
WIRELESS_IFNAME="wlp4s0" | |
BATTERY = "BAT0" | |
MINOR_REFRESH_TIMEOUT = 10.0 | |
MAJOR_REFRESH_COUNTER = 3 | |
home = os.environ.get("HOME") | |
log.basicConfig(filename=os.path.join(home, ".local", "share", "statusbar", "log"), | |
level=log.INFO) | |
handles = {} | |
def fp(filename): | |
if filename not in handles: | |
handles[filename] = open(filename, "r") | |
return handles[filename] | |
def read(filename): | |
f = fp(filename) | |
f.seek(0) | |
return f.read().strip() | |
def block(**kwargs): | |
if "separator_block_width" not in kwargs: | |
kwargs["separator_block_width"] = 55 | |
return kwargs | |
def dict_from_command(command, separator="="): | |
proc = subprocess.run(command, stdout=subprocess.PIPE) | |
output = proc.stdout.decode("utf-8") | |
return {bits[0].strip(): bits[1].strip() | |
for bits in [line.split(separator, 1) | |
for line in output.split("\n") | |
if separator in line]} | |
def wireless_block(): | |
wpa = dict_from_command(["/usr/bin/wpa_cli", "-i", WIRELESS_IFNAME, "status"]) | |
iw = dict_from_command(["/usr/bin/iw", "dev", WIRELESS_IFNAME, "link"], separator=": ") | |
wpa_state = wpa.get("wpa_state") | |
if wpa_state == "COMPLETED": | |
return block(color="#00ff00", | |
full_text="📶 ssid:%s rate:%s freq:%s wpa:%s/%s/%s" % (iw.get("SSID", "none"), | |
iw.get("tx bitrate", "").split(" ", 1)[0], | |
iw.get("freq", "scanning"), | |
wpa.get("key_mgmt"), | |
wpa.get("group_cipher"), | |
wpa.get("pairwise_cipher"))) | |
else: | |
return block(color="#ff0000", full_text="📶 %s" % wpa_state) | |
def default_gateway_block(): | |
gateways = netifaces.gateways() | |
color = "#00ff00" | |
try: | |
v4 = ",".join(gateway[1] for gateway in gateways[AF_INET]) | |
except KeyError: | |
v4 = "none" | |
color = "#ff0000" | |
try: | |
v6 = ",".join(gateway[1] for gateway in gateways[AF_INET6]) | |
except KeyError: | |
v6 = "none" | |
color = "#ff0000" | |
return block(full_text="📡 v4:%s v6:%s" % (v4, v6), color=color) | |
def microphone_block(): | |
with pulsectl.Pulse("bar-mic-check") as pulse: | |
sources = [source for source in pulse.source_list() if not source.name.endswith(".monitor")] | |
if not all(source.mute for source in sources): | |
return block(name="mic", full_text="🎤 LISTENING", color="#ff0000") | |
else: | |
return block(name="mic", full_text="🎤 DISABLED", color="#00ff00") | |
def toggle_microphone(): | |
with pulsectl.Pulse("bar-mic-toggle") as pulse: | |
sources = [source for source in pulse.source_list() if not source.name.endswith(".monitor")] | |
muted = all(source.mute for source in sources) | |
for source in sources: | |
pulse.mute(source, 0 if muted else 1) | |
def battery_block(): | |
try: | |
base = "/sys/class/power_supply/" + BATTERY | |
state = read(base + "/status") | |
color = None | |
try: | |
charge_now = read(base + "/charge_now") | |
charge_full = read(base + "/charge_full") | |
charge = int(int(charge_now) / int(charge_full) * 100) | |
except ValueError: | |
charge = "?" | |
else: | |
if state == "Charging" or charge > 80: | |
color = "#00ff00" | |
elif charge < 20: | |
color = "#ff0000" | |
return block(full_text="🔋 %s %s%%" % (state, charge), color=color) | |
except FileNotFoundError: | |
return block(full_text="🔋 MISSING", color="#ff0000") | |
# This finds the unicode clock character that looks closest to the current time | |
def unicode_char_for_datetime(dt): | |
hourish = (dt.hour if dt.hour != 0 else 12) + (1 if dt.minute >= 45 else 0) | |
while hourish > 12: | |
hourish -= 12 | |
offset = 12 if dt.minute > 15 and dt.minute < 45 else 0 | |
return chr(128335 + hourish + offset) | |
def time_block(prefix, tz, short=False): | |
dt = utc.localize(datetime.utcnow()).astimezone(tz) | |
symbol = unicode_char_for_datetime(dt) | |
short_text = "%s %s %s" % (symbol, prefix, dt.strftime("%H:%M")) | |
if short: | |
return block(full_text=short_text) | |
else: | |
return block(full_text="%s %s %s" % (symbol, prefix, dt.strftime("%a %Y-%m-%d %H:%M")), | |
short_text=short_text) | |
europe_time_block = partial(time_block, prefix="EU", tz=timezone("Europe/Berlin")) | |
europe_time_block.__name__ = "europe_time_block" | |
shanghai_time_block = partial(time_block, prefix="CN", tz=timezone("Asia/Shanghai"), short=True) | |
shanghai_time_block.__name__ = "shanghai_time_block" | |
auckland_time_block = partial(time_block, prefix="NZ", tz=timezone("Pacific/Auckland"), short=True) | |
auckland_time_block.__name__ = "auckland_time_block" | |
queue = Queue() | |
def output_loop(): | |
try: | |
json.dump({"version": 1, "click_events": True}, sys.stdout) | |
sys.stdout.write("\n[") | |
tasks = OrderedDict([(wireless_block, None), | |
(default_gateway_block, None), | |
(microphone_block, None), | |
(battery_block, None), | |
(auckland_time_block, None), | |
(shanghai_time_block, None), | |
(europe_time_block, None)]) | |
refresh = "major" | |
counter = MAJOR_REFRESH_COUNTER | |
# maintain pytz! | |
try: | |
while True: | |
if refresh == "minor": | |
for task in (europe_time_block, shanghai_time_block, auckland_time_block): | |
tasks[task] = task() | |
elif refresh == "major": | |
for task in tasks: | |
tasks[task] = task() | |
else: | |
task = next(task for task in tasks if task.__name__ == refresh) | |
tasks[task] = task() | |
json.dump(list(tasks.values()), sys.stdout) | |
sys.stdout.write(",\n") | |
sys.stdout.flush() | |
try: | |
refresh = queue.get(timeout=MINOR_REFRESH_TIMEOUT) | |
except Empty: | |
if counter > 0: | |
refresh = "minor" | |
counter -= 1 | |
else: | |
refresh = "major" | |
counter = MAJOR_REFRESH_COUNTER | |
except KeyboardInterrupt: | |
sys.stdout.write("[]") | |
raise | |
finally: | |
for key, handle in handles.items(): | |
try: | |
handle.close() | |
except: | |
log.exception("closing handle %s", key) | |
sys.stdout.write("]") | |
sys.stdout.flush() | |
except KeyboardInterrupt: | |
raise | |
except: | |
log.exception("exception in output loop") | |
def input_loop(): | |
try: | |
while True: | |
line = sys.stdin.readline().strip() | |
if line == "[": | |
continue | |
if line == "]": | |
break | |
try: | |
command = json.loads(line[:-1]) | |
except: | |
log.exception("exception decoding event JSON") | |
else: | |
if command["name"] == "mic": | |
toggle_microphone() | |
queue.put("microphone_block") | |
else: | |
queue.put("major") | |
except: | |
log.exception("exception in input loop") | |
def external_trigger_loop(): | |
# TODO | |
# Watch battery status, wireless status, pulseaudio status, gateway status | |
# for changes and trigger updates of the appropriate blocks | |
pass | |
if __name__ == "__main__": | |
threading.Thread(target=input_loop, name="input").start() | |
#threading.Thread(target=external_trigger_loop, name="external").start() | |
output_loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment