Last active
October 5, 2023 02:51
-
-
Save justengel/a536738a5faa3d9932a60581f2ce2913 to your computer and use it in GitHub Desktop.
Replace print with a new terminal buffer screen
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import builtins | |
import time | |
import warnings | |
from functools import wraps | |
def skip_without_screen(func): | |
@wraps(func) | |
def decorated(*args, **kwargs): | |
if args[0].stdscr is None: | |
return | |
return func(*args, **kwargs) | |
return decorated | |
class ScreenPrint: | |
q_key = ord("q") | |
Q_key = ord("Q") | |
def __init__(self, pad_height: int = 0, pad_width: int = 0, scroll_step: int = 3, truncate: str = "..."): | |
self.orig_print = print | |
self.curses = None | |
self.stdscr = None | |
self.screen_height = 0 | |
self.screen_width = 0 | |
self.idx = 0 | |
self._last_idx = 0 | |
self.pad = None | |
self.pad_height = pad_height | |
self.pad_width = pad_width | |
self.pad_xpos = 0 | |
self.pad_ypos = 0 | |
self.truncate = truncate | |
self._exit = False | |
self._last_tick = 0 | |
self._last_interval = 0 | |
self.scroll_step = scroll_step | |
def register(self): | |
builtins.print = self.__call__ | |
def deregister(self): | |
builtins.print = self.orig_print | |
return self | |
def init(self): | |
try: | |
import curses | |
self.curses = curses | |
self.stdscr = curses.initscr() | |
curses.noecho() | |
curses.cbreak() | |
self.stdscr.keypad(True) | |
self.stdscr.nodelay(True) | |
try: | |
curses.start_color() | |
except: | |
pass | |
# Create the pad | |
self.screen_height, self.screen_width = self.stdscr.getmaxyx() | |
self._new_pad() | |
# Clear the screen | |
self.clear() | |
except (ImportError, Exception): | |
warnings.warn("Cannot make a screen!") | |
time.sleep(1) | |
return self | |
def start(self): | |
self.register() | |
self.init() | |
return self | |
def close(self): | |
self.deregister() | |
if self.curses: | |
pad, self.pad = self.pad, None | |
stdscr, self.stdscr = self.stdscr, None | |
cur, self.curses = self.curses, None | |
cur.endwin() | |
del pad | |
del stdscr | |
del cur | |
return self | |
def __del__(self): | |
try: | |
self.close() | |
except (AttributeError, Exception): | |
pass | |
@skip_without_screen | |
def _new_pad(self): | |
if self.pad_height == 0: | |
self.pad_height = self.screen_height * 10 | |
if self.pad_width == 0: | |
self.pad_width = self.screen_width * 100 | |
self.pad = self.curses.newpad(self.pad_height, self.pad_width) | |
self.pad.keypad(True) | |
self.pad.nodelay(True) | |
def _scroll_limit_x(self): | |
if self.pad_xpos < 0: | |
self.pad_xpos = 0 | |
elif self.pad_xpos > self.pad_height: | |
self.pad_xpos = self.pad_height | |
def _scroll_limit_y(self): | |
if self.pad_ypos < 0: | |
self.pad_ypos = 0 | |
elif self.pad_ypos > self.pad_height: | |
self.pad_ypos = self.pad_height | |
def scroll(self, x: int = 0, y: int = 0): | |
"""Increase the scroll amount.""" | |
self.pad_xpos += x | |
self.pad_ypos += y | |
self._scroll_limit_x() | |
self._scroll_limit_y() | |
self._pad_refresh() | |
def scroll_to(self, x: int = 0, y: int = 0): | |
self.pad_xpos = x | |
self.pad_ypos = y | |
self._scroll_limit_x() | |
self._scroll_limit_y() | |
self._pad_refresh() | |
@skip_without_screen | |
def clear(self): | |
self.pad.clear() | |
self.screen_height, self.screen_width = self.stdscr.getmaxyx() | |
if self.idx != 0: | |
self._last_idx = self.idx | |
self.idx = 0 | |
def resize(self): | |
change_pad = False | |
if self.pad_height <= self.screen_height: | |
change_pad = True | |
self.pad_height = 0 | |
elif self.pad_width <= self.screen_width: | |
change_pad = True | |
self.pad_width = 0 | |
if change_pad: | |
self._new_pad() | |
self._scroll_limit_x() | |
self._scroll_limit_y() | |
self._pad_refresh() | |
@skip_without_screen | |
def _pad_refresh(self): | |
self.pad.refresh(self.pad_xpos, self.pad_ypos, 0, 0, self.screen_height - 1, self.screen_width - 1) | |
# self.stdscr.refresh() | |
def tick(self, interval: float = 0) -> bool: | |
"""Get quit and scroll positions. | |
If the interval is given automatically clear for a new screen. | |
""" | |
now = time.time() | |
interval_timeout = (now - self._last_interval) > interval | |
if interval and interval_timeout: | |
self._last_interval = now | |
self.clear() | |
if self.pad: | |
ch = self.pad.getch() | |
if ch == -1 or (now - self._last_tick < 1 / 120): | |
return interval_timeout | |
self._last_tick = now | |
keyname = self.curses.keyname(ch) | |
scroll_x = 0 | |
scroll_y = 0 | |
if ch == self.q_key or ch == self.Q_key: | |
self.quit() | |
elif ch == self.curses.KEY_RESIZE: | |
self.screen_height, self.screen_width = self.stdscr.getmaxyx() | |
self.resize() | |
elif ch == self.curses.KEY_UP: | |
scroll_x -= 1 | |
elif ch == self.curses.KEY_DOWN: | |
scroll_x += 1 | |
elif ch == self.curses.KEY_PPAGE or keyname == b'KEY_SUP': | |
scroll_x -= self.screen_height | |
elif ch == self.curses.KEY_NPAGE or keyname == b'KEY_SDOWN': | |
scroll_x += self.screen_height | |
elif ch == self.curses.KEY_HOME or ch == self.curses.KEY_SHOME: | |
self.scroll_to(0, 0) | |
elif ch == self.curses.KEY_END or ch == self.curses.KEY_SEND: | |
self.scroll_to(max(self.idx, self._last_idx) - self.screen_height, 0) | |
elif ch == self.curses.KEY_LEFT: | |
scroll_y -= 1 | |
elif ch == self.curses.KEY_RIGHT: | |
scroll_y += 1 | |
elif ch == self.curses.KEY_SLEFT: | |
scroll_y -= self.screen_width | |
elif ch == self.curses.KEY_SRIGHT: | |
scroll_y += self.screen_width | |
if scroll_x or scroll_y: | |
self.scroll(scroll_x, scroll_y) | |
return interval_timeout | |
@skip_without_screen | |
def print_msg(self, *args, **kwargs): | |
s = " ".join((str(a) for a in args)) | |
for line in s.replace("\r\n", "\n").split("\n"): | |
if self.idx >= self.pad_height: | |
break | |
elif self.idx >= self.pad_height - 1: | |
self.pad.addstr(self.idx, 0, self.truncate[: self.pad_width - 1]) | |
self.idx += 1 | |
break | |
self.pad.addstr(self.idx, 0, line[: self.pad_width - 1]) | |
self.idx += 1 | |
self._pad_refresh() | |
def should_quit(self): | |
return self._exit | |
def quit(self): | |
self._exit = True | |
self.close() | |
sys.exit(0) | |
def __call__(self, *args, **kwargs): | |
if self.stdscr is None or kwargs.get("file", None) is not None: | |
self.orig_print(*args, **kwargs) | |
return | |
self.print_msg(*args, **kwargs) | |
def __enter__(self): | |
self.start() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.close() | |
return exc_type == KeyboardInterrupt | |
def main(): | |
import pandas as pd | |
import numpy as np | |
data_height = 400 | |
data_width = 100 | |
period = 3 | |
with ScreenPrint() as scrn: | |
while True: | |
if scrn.tick(period): | |
columns = ["col_" + str(i) for i in range(data_width)] | |
df = pd.DataFrame( | |
np.random.randint(0, 100, size=(data_height, data_width)), columns=columns | |
) | |
print("My Data:") | |
print(df.to_string()) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment