Skip to content

Instantly share code, notes, and snippets.

@lsmag
Created February 15, 2017 00:11
Show Gist options
  • Save lsmag/00f0514b1e581771f208f84fdb6d2102 to your computer and use it in GitHub Desktop.
Save lsmag/00f0514b1e581771f208f84fdb6d2102 to your computer and use it in GitHub Desktop.
import asyncio
import sys
from asyncio import Queue
TAG_ALIGN_LEFT = '%{l}'
TAG_ALIGN_CENTER = '%{c}'
TAG_ALIGN_RIGHT = '%{r}'
UNDERLINE_OPEN = '%{+u}'
UNDERLINE_CLOSE = '%{-u}'
FOCUSED_TAG_SYMBOL = '#'
FOCUSED_TAG_UNFOCUSED_MONITOR_SYMBOL = '+'
UNFOCUSED_TAG_WITH_APPS_SYMBOL = ':'
UNUSED_TAG_SYMBOL = '.'
TAG_NEEDS_ATTENTION_SYMBOL = ':'
DARK_COLOR = '#343838'
LIGHT_COLOR = '#D1D1D1'
FOCUS_COLOR = '#0084CC'
ATTENTION_COLOR = '#900000'
def fg(color, text):
"""Formats foreground color for lemonbar"""
return ''.join(['%{F', color, '}', text, '%{F-}'])
def bg(color, text):
"""Formats background color for lemonbar"""
return ''.join(['%{B', color, '}', text, '%{B-}'])
def pattern(*, background, foreground, text):
"""Applies background and foreground colors to text for lemonbar"""
return bg(background, fg(foreground, ' {} '.format(text)))
async def get_hlwm_tags(loop):
coro = await asyncio.create_subprocess_exec('herbstclient', 'tag_status', loop=loop, stdout=asyncio.subprocess.PIPE)
line = await coro.stdout.readline()
tags = line.decode().strip().split('\t')
out = []
for tag in tags:
symbol, tag = tag[0], tag[1:]
if symbol == FOCUSED_TAG_SYMBOL:
out.append(pattern(
background=FOCUS_COLOR,
foreground=LIGHT_COLOR,
text=tag
# text=UNDERLINE_OPEN + tag + UNDERLINE_CLOSE
))
elif symbol in [FOCUSED_TAG_UNFOCUSED_MONITOR_SYMBOL, UNFOCUSED_TAG_WITH_APPS_SYMBOL]:
out.append(pattern(
background=LIGHT_COLOR,
foreground=DARK_COLOR,
text=tag
))
elif symbol == TAG_NEEDS_ATTENTION_SYMBOL:
out.append(pattern(
background=ATTENTION_COLOR,
foreground=LIGHT_COLOR,
text=tag
))
elif symbol == UNUSED_TAG_SYMBOL:
out.append(pattern(
background=DARK_COLOR,
foreground=LIGHT_COLOR,
text=tag
))
return ' '.join(out)
async def hlwm_tags_command(loop, queue, *, label):
first_tag_status = await get_hlwm_tags(loop)
queue.put_nowait((label, first_tag_status))
coro = await asyncio.create_subprocess_exec('herbstclient', '--idle', 'tag_changed', loop=loop, stdout=asyncio.subprocess.PIPE)
while True:
# we can safely ignore the output of the command above
await coro.stdout.readline()
tag_status = await get_hlwm_tags(loop)
queue.put_nowait((label, tag_status))
# tag_status_coro = await asyncio.create_subprocess_exec('herbstclient', ' tag_status')
async def run_command(asyncio_cmd, loop, command, queue, *, label, interval):
while True:
coro = await asyncio_cmd(*command, loop=loop, stdout=asyncio.subprocess.PIPE)
line = await coro.stdout.readline()
formatted_line = line.strip().decode() + ' '
queue.put_nowait((label, formatted_line))
await asyncio.sleep(interval)
def exec_command(*args, **kwargs):
return run_command(asyncio.create_subprocess_exec, *args, **kwargs)
def shell_command(*args, **kwargs):
return run_command(asyncio.create_subprocess_shell, *args, **kwargs)
async def queue_consumer(queue):
panel_format = ('{TAG_ALIGN_LEFT}'
'{hlwm_tags}'
'{TAG_ALIGN_CENTER}'
'MEM {memory}'
'SWAP {swap}'
'CPU {cpu}'
'TEMP {temperature}'
'{battery}'
'{datetime}'
'{timeutc}')
PLACEHOLDER = 'WIP '
results = {
'hlwm_tags': PLACEHOLDER,
'memory': PLACEHOLDER,
'swap': PLACEHOLDER,
'cpu': PLACEHOLDER,
'temperature': PLACEHOLDER,
'battery': PLACEHOLDER,
'datetime': PLACEHOLDER,
'timeutc': PLACEHOLDER
}
while True:
key, line = await queue.get()
results[key] = line
print(panel_format.format(
TAG_ALIGN_LEFT=TAG_ALIGN_LEFT,
TAG_ALIGN_CENTER=TAG_ALIGN_CENTER,
TAG_ALIGN_RIGHT=TAG_ALIGN_RIGHT,
**results
))
sys.stdout.flush()
def main():
loop = asyncio.get_event_loop()
queue = Queue(loop=loop)
hlwm_tags_coro = hlwm_tags_command(loop, queue, label='hlwm_tags')
battery_coro = exec_command(loop, ['/usr/lib/i3blocks/battery'], queue, label='battery', interval=30)
cpu_coro = exec_command(loop, ['/usr/lib/i3blocks/cpu_usage'], queue, label='cpu', interval=30)
temperature_coro = exec_command(loop, ['/usr/lib/i3blocks/temperature'], queue, label='temperature', interval=30)
swap_coro = shell_command(loop, ['BLOCK_INSTANCE=swap /usr/lib/i3blocks/memory'], queue, label='swap', interval=30)
memory_coro = exec_command(loop, ['/usr/lib/i3blocks/memory'], queue, label='memory', interval=30)
datetime_coro = exec_command(loop, ['date', '+%a %b %d %R %Z'], queue, label='datetime', interval=5)
timeutc_coro = exec_command(loop, ['date', '--utc', '+%R %Z'], queue, label='timeutc', interval=5)
loop.run_until_complete(asyncio.gather(
queue_consumer(queue),
hlwm_tags_coro,
battery_coro,
cpu_coro,
temperature_coro,
swap_coro,
memory_coro,
datetime_coro,
timeutc_coro,
))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment