Skip to content

Instantly share code, notes, and snippets.

@comex
Created September 23, 2023 23:26
Show Gist options
  • Save comex/fd9748146ad838855d5a9d4b664aa603 to your computer and use it in GitHub Desktop.
Save comex/fd9748146ad838855d5a9d4b664aa603 to your computer and use it in GitHub Desktop.
Tool to test a shell's input latency
import sys, pty, os, time, signal, threading, string, random
sub_argv = sys.argv[1:]
if not sub_argv:
raise Exception("usage: shell-latency-test.py shell-to-test (e.g.: shell-latency-test.py nu)")
# Spawn the shell in a pty which we own the controlling end of.
child_pid, fd = pty.fork()
if child_pid == 0:
# This is the child process. Exec the shell.
os.execvp(sub_argv[0], sub_argv)
raise Exception("execvp failed")
# Otherwise, we're the parent.
# Come up with a random 'command' to type into the shell (just some random letters).
# It's random to ensure that we don't see a false positive due to e.g. nushell's
# history completion.
random_command = ''.join(random.sample(string.ascii_letters, 10)).encode('ascii')
def input_thread():
global wrote_random_command_time
# Wait one second to ensure the shell is done initializing.
time.sleep(1)
# Record the current time...
wrote_random_command_time = time.time()
# And send the Record the current time...
os.write(fd, random_command)
def output_thread():
# Log all the data we've ever read, so that we can search for strings
# without it breaking if the string is split across multiple read calls.
# This is O(n^2) but n is too small for it to matter.
ever_read_data = b''
# How many current position requests were seen last time.
old_6n_count = 0
# Use raw reads to avoid any buffering.
while new_data := os.read(fd, 1048576):
# Record the current time and the data read
read_time = time.time()
ever_read_data += new_data
print('read:', repr(new_data))
# Did we see our command echoed back by the shell?
if random_command in ever_read_data:
# We shouldn't have read it back if we didn't write it yet:
assert wrote_random_command_time is not None
# All done.
print('done. latency:', read_time - wrote_random_command_time)
break
# Otherwise, is there a current position request?
# We don't do a full terminal simulation, but we do need to respond to
# these, or else nushell won't start up.
new_6n_count = ever_read_data.count(b'\x1b[6n')
for _ in range(new_6n_count - old_6n_count):
os.write(fd, b'\x1b[1;1R')
old_6n_count = new_6n_count
threading.Thread(target=input_thread).start()
output_thread()
# All done. Kill the shell.
os.kill(child_pid, signal.SIGTERM)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment