Created
February 15, 2017 00:11
-
-
Save lsmag/00f0514b1e581771f208f84fdb6d2102 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import sys | |
from asyncio import Queue | |
TAG_ALIGN_LEFT = '%{l}' | |
TAG_ALIGN_CENTER = '%{c}' | |
TAG_ALIGN_RIGHT = '%{r}' | |
UNDERLINE_OPEN = '%{+u}' | |
UNDERLINE_CLOSE = '%{-u}' | |
FOCUSED_TAG_SYMBOL = '#' | |
FOCUSED_TAG_UNFOCUSED_MONITOR_SYMBOL = '+' | |
UNFOCUSED_TAG_WITH_APPS_SYMBOL = ':' | |
UNUSED_TAG_SYMBOL = '.' | |
TAG_NEEDS_ATTENTION_SYMBOL = ':' | |
DARK_COLOR = '#343838' | |
LIGHT_COLOR = '#D1D1D1' | |
FOCUS_COLOR = '#0084CC' | |
ATTENTION_COLOR = '#900000' | |
def fg(color, text): | |
"""Formats foreground color for lemonbar""" | |
return ''.join(['%{F', color, '}', text, '%{F-}']) | |
def bg(color, text): | |
"""Formats background color for lemonbar""" | |
return ''.join(['%{B', color, '}', text, '%{B-}']) | |
def pattern(*, background, foreground, text): | |
"""Applies background and foreground colors to text for lemonbar""" | |
return bg(background, fg(foreground, ' {} '.format(text))) | |
async def get_hlwm_tags(loop): | |
coro = await asyncio.create_subprocess_exec('herbstclient', 'tag_status', loop=loop, stdout=asyncio.subprocess.PIPE) | |
line = await coro.stdout.readline() | |
tags = line.decode().strip().split('\t') | |
out = [] | |
for tag in tags: | |
symbol, tag = tag[0], tag[1:] | |
if symbol == FOCUSED_TAG_SYMBOL: | |
out.append(pattern( | |
background=FOCUS_COLOR, | |
foreground=LIGHT_COLOR, | |
text=tag | |
# text=UNDERLINE_OPEN + tag + UNDERLINE_CLOSE | |
)) | |
elif symbol in [FOCUSED_TAG_UNFOCUSED_MONITOR_SYMBOL, UNFOCUSED_TAG_WITH_APPS_SYMBOL]: | |
out.append(pattern( | |
background=LIGHT_COLOR, | |
foreground=DARK_COLOR, | |
text=tag | |
)) | |
elif symbol == TAG_NEEDS_ATTENTION_SYMBOL: | |
out.append(pattern( | |
background=ATTENTION_COLOR, | |
foreground=LIGHT_COLOR, | |
text=tag | |
)) | |
elif symbol == UNUSED_TAG_SYMBOL: | |
out.append(pattern( | |
background=DARK_COLOR, | |
foreground=LIGHT_COLOR, | |
text=tag | |
)) | |
return ' '.join(out) | |
async def hlwm_tags_command(loop, queue, *, label): | |
first_tag_status = await get_hlwm_tags(loop) | |
queue.put_nowait((label, first_tag_status)) | |
coro = await asyncio.create_subprocess_exec('herbstclient', '--idle', 'tag_changed', loop=loop, stdout=asyncio.subprocess.PIPE) | |
while True: | |
# we can safely ignore the output of the command above | |
await coro.stdout.readline() | |
tag_status = await get_hlwm_tags(loop) | |
queue.put_nowait((label, tag_status)) | |
# tag_status_coro = await asyncio.create_subprocess_exec('herbstclient', ' tag_status') | |
async def run_command(asyncio_cmd, loop, command, queue, *, label, interval): | |
while True: | |
coro = await asyncio_cmd(*command, loop=loop, stdout=asyncio.subprocess.PIPE) | |
line = await coro.stdout.readline() | |
formatted_line = line.strip().decode() + ' ' | |
queue.put_nowait((label, formatted_line)) | |
await asyncio.sleep(interval) | |
def exec_command(*args, **kwargs): | |
return run_command(asyncio.create_subprocess_exec, *args, **kwargs) | |
def shell_command(*args, **kwargs): | |
return run_command(asyncio.create_subprocess_shell, *args, **kwargs) | |
async def queue_consumer(queue): | |
panel_format = ('{TAG_ALIGN_LEFT}' | |
'{hlwm_tags}' | |
'{TAG_ALIGN_CENTER}' | |
'MEM {memory}' | |
'SWAP {swap}' | |
'CPU {cpu}' | |
'TEMP {temperature}' | |
'{battery}' | |
'{datetime}' | |
'{timeutc}') | |
PLACEHOLDER = 'WIP ' | |
results = { | |
'hlwm_tags': PLACEHOLDER, | |
'memory': PLACEHOLDER, | |
'swap': PLACEHOLDER, | |
'cpu': PLACEHOLDER, | |
'temperature': PLACEHOLDER, | |
'battery': PLACEHOLDER, | |
'datetime': PLACEHOLDER, | |
'timeutc': PLACEHOLDER | |
} | |
while True: | |
key, line = await queue.get() | |
results[key] = line | |
print(panel_format.format( | |
TAG_ALIGN_LEFT=TAG_ALIGN_LEFT, | |
TAG_ALIGN_CENTER=TAG_ALIGN_CENTER, | |
TAG_ALIGN_RIGHT=TAG_ALIGN_RIGHT, | |
**results | |
)) | |
sys.stdout.flush() | |
def main(): | |
loop = asyncio.get_event_loop() | |
queue = Queue(loop=loop) | |
hlwm_tags_coro = hlwm_tags_command(loop, queue, label='hlwm_tags') | |
battery_coro = exec_command(loop, ['/usr/lib/i3blocks/battery'], queue, label='battery', interval=30) | |
cpu_coro = exec_command(loop, ['/usr/lib/i3blocks/cpu_usage'], queue, label='cpu', interval=30) | |
temperature_coro = exec_command(loop, ['/usr/lib/i3blocks/temperature'], queue, label='temperature', interval=30) | |
swap_coro = shell_command(loop, ['BLOCK_INSTANCE=swap /usr/lib/i3blocks/memory'], queue, label='swap', interval=30) | |
memory_coro = exec_command(loop, ['/usr/lib/i3blocks/memory'], queue, label='memory', interval=30) | |
datetime_coro = exec_command(loop, ['date', '+%a %b %d %R %Z'], queue, label='datetime', interval=5) | |
timeutc_coro = exec_command(loop, ['date', '--utc', '+%R %Z'], queue, label='timeutc', interval=5) | |
loop.run_until_complete(asyncio.gather( | |
queue_consumer(queue), | |
hlwm_tags_coro, | |
battery_coro, | |
cpu_coro, | |
temperature_coro, | |
swap_coro, | |
memory_coro, | |
datetime_coro, | |
timeutc_coro, | |
)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment