Skip to content

Instantly share code, notes, and snippets.

@IngoMeyer441
Last active January 30, 2019 16:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IngoMeyer441/d6715e022bbc85354ebf770d7489e194 to your computer and use it in GitHub Desktop.
Save IngoMeyer441/d6715e022bbc85354ebf770d7489e194 to your computer and use it in GitHub Desktop.
less wrapper that only calls less if the output does not fit into one terminal page
#!/usr/bin/env python3
import errno
import locale
import os
import subprocess
import sys
from math import ceil
from typing import Dict, IO, Iterator, List, Optional, Tuple # noqa: F401 # pylint: disable=unused-import
BLOCK_SIZE = 1024
def get_terminal_size() -> Optional[Tuple[int, int]]:
"""Adopted from https://gist.github.com/jtriley/1108174
"""
def ioctl_GWINSZ(fd: int) -> Optional[Tuple[int, int]]:
try:
import fcntl
import struct
import termios
cr = struct.unpack("hh", fcntl.ioctl(fd, termios.TIOCGWINSZ, b"1234"))
if len(cr) < 2:
return None
return int(cr[0]), int(cr[1])
except: # noqa: W702 # pylint: disable=bare-except
return None
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except: # noqa: W702 # pylint: disable=bare-except
pass
if not cr:
try:
cr = (int(os.environ["LINES"]), int(os.environ["COLUMNS"]))
except: # noqa: W702 # pylint: disable=bare-except
return None
return cr
def get_locale_encoding() -> str:
encoding = locale.getlocale()[1]
if encoding is None:
encoding = "ascii"
return encoding
def decode(byte_string: bytes) -> str:
encoding = get_locale_encoding()
return byte_string.decode(encoding, "replace")
def encode(unicode_string: str) -> bytes:
encoding = get_locale_encoding()
return unicode_string.encode(encoding, "replace")
def gen_lines(f: IO[bytes]) -> Iterator[str]:
byte_block = b""
try:
while True:
try:
current_bytes = f.read(BLOCK_SIZE)
except OSError as e:
if e.errno != errno.EIO:
raise
break # EIO means EOF on some systems
if not current_bytes:
break
byte_block += current_bytes
if current_bytes.find(b"\n") >= 0:
lines = byte_block.split(b"\n")
for line in lines[:-1]:
yield decode(line)
byte_block = lines[-1]
finally:
f.close()
if byte_block:
yield decode(byte_block)
def view_byte_file(f: IO[bytes]) -> int:
line_generator = gen_lines(f)
terminal_size = get_terminal_size()
if terminal_size is None:
# If the terminal size cannot be determined, this script probably does not run in a tty -> do not use a pager
for line in line_generator:
print(line)
returncode = 0
else:
terminal_height, terminal_width = terminal_size
lines = [] # type: List[str]
line_count = 0
for line in line_generator:
lines.append(line)
line_count += int(ceil(len(line) / terminal_width))
if line_count >= terminal_height:
output_fits_on_one_page = False
break
else:
output_fits_on_one_page = True
if output_fits_on_one_page:
sys.stdout.write("{}\n".format("\n".join(lines)))
sys.stdout.flush()
returncode = 0
else:
process_less = subprocess.Popen(["less", "-R", "-+F", "-+X"], stdin=subprocess.PIPE)
try:
process_less.stdin.write(encode("{}\n".format("\n".join(lines))))
process_less.stdin.flush()
for line in line_generator:
process_less.stdin.write(encode("{}\n".format(line)))
process_less.stdin.flush()
process_less.stdin.close()
except BrokenPipeError:
pass
returncode = process_less.wait()
return returncode
def main() -> None:
if len(sys.argv) > 1:
print("Currently, only piped input is supported.", file=sys.stderr)
sys.exit(1)
stdin_byte_file = sys.stdin.buffer
sys.exit(view_byte_file(stdin_byte_file))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment