Skip to content

Instantly share code, notes, and snippets.

@dylanwh
Created September 29, 2022 01:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dylanwh/8545205543443a7e4afcf11e4a1caeb4 to your computer and use it in GitHub Desktop.
Save dylanwh/8545205543443a7e4afcf11e4a1caeb4 to your computer and use it in GitHub Desktop.
from contextlib import contextmanager
import subprocess
import re
import sys
import time
import curses
# usage:
# cmd = ['ls', '-l']
# pattern = re.compile('beans')
# with watch_until(cmd, pattern) as w:
# # do stuff
@contextmanager
def watch_output(cmd, **kwargs):
if 'stop_regex' not in kwargs:
raise ValueError('stop_regex is required')
if 'interval' in kwargs:
interval = kwargs['interval']
del kwargs['interval']
else:
interval = 1
stop_regex = kwargs['stop_regex']
del kwargs['stop_regex']
def loop(stdscr):
# reset colors
curses.use_default_colors()
while True:
stdscr.erase()
out = subprocess.check_output(cmd, **kwargs)
if stop_regex.search(out):
return out
lines = out.decode('utf-8').splitlines()
status_widget = "Every {}s: {}".format(interval, ' '.join(cmd))
time_widget = time.ctime()
padding = ' ' * (curses.COLS - len(status_widget) - len(time_widget))
stdscr.addstr(0, 0, status_widget + padding + time_widget)
for i, line in enumerate(lines[-curses.LINES:]):
stdscr.addstr(i+2, 0, line + "\n")
stdscr.refresh()
time.sleep(interval)
yield curses.wrapper(loop)
if __name__ == '__main__':
w = watch_output(
['ls', '-l'],
stop_regex=re.compile('beans'),
interval=1
)
with w as out:
print('found beans')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment