Skip to content

Instantly share code, notes, and snippets.

@comex
Last active October 8, 2023 00:37
Show Gist options
  • Save comex/380653fefb3312b24da5d66ed5ee5be0 to your computer and use it in GitHub Desktop.
Save comex/380653fefb3312b24da5d66ed5ee5be0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys, pty, os, time, threading, queue, argparse
ap = argparse.ArgumentParser()
parser = argparse.ArgumentParser(
description='Tests the input latency of a shell',
usage='usage: shell-latency-test-infinite.py [--sleep] [--] argv-to-execute...',
)
parser.add_argument('--sleep',
action='store_true',
help='''only type once every 1s instead of constantly - results in higher
observed latency, probably due to OS overhead''')
parser.add_argument('argv_to_execute', nargs='+', help='shell command to execute')
args = parser.parse_args()
# Spawn the shell in a pty which we own the controlling end of.
child_pid, fd = pty.fork()
if child_pid == 0:
# This is the child process. Exec the shell.
os.execvp(args.argv_to_execute[0], args.argv_to_execute)
raise Exception("execvp failed")
# Otherwise, we're the parent.
data_queue_main = queue.SimpleQueue()
data_queue_position_requests = queue.SimpleQueue()
data_queues = [data_queue_main, data_queue_position_requests]
def read_thread():
'''Constantly read input from the shell and send it to each of the queues.'''
while new_data := os.read(fd, 1048576):
for q in data_queues:
q.put(new_data)
threading.Thread(target=read_thread, daemon=True).start()
def position_request_thread():
'''Handle position requests.'''
buf = b''
while True:
new_data = data_queue_position_requests.get()
buf = buf[-3:] + new_data
for _ in range(buf.count(b'\x1b[6n')):
os.write(fd, b'\x1b[1;1R')
threading.Thread(target=position_request_thread, daemon=True).start()
def join_queue(q):
'''Remove all bytes objects currently in the queue and return them concatenated.'''
ret = b''
while True:
try:
ret += q.get_nowait()
except queue.Empty:
break
return ret
# Wait one second to ensure the shell is done initializing.
time.sleep(1)
startup_data = join_queue(data_queue_main)
print('startup:', startup_data)
durations = []
durations_clear_time = time.time()
i = 0
while True:
# Did we get any extra data, suggesting the below assumption is violated?
junk = join_queue(data_queue_main)
if junk:
print('junk:', junk)
# Alternately input 'a' and input a backspace to erase the 'a'.
os.write(fd, b'\b' if i & 1 else b'a')
# The timing isn't exact (e.g. there might be a delay between the write
# and calculating the time here), but any error is tens of microseconds
# at most.
time_pre = time.time()
# Assume the first read contains the full response to the input and the
# shell won't be sending anything more. If we didn't assume this, we
# couldn't immediately continue with more keystrokes.
new_data = data_queue_main.get()
duration_us = (time.time() - time_pre) * 1_000_000
durations.append(duration_us)
if args.sleep:
time.sleep(1)
total_duration = time.time() - durations_clear_time
if total_duration >= 1.0 or args.sleep:
average_duration_us = sum(durations) / len(durations)
print(f'{len(durations)} rounds in {total_duration:2.1f}s, average latency {average_duration_us:5,.0f}us')
durations = []
durations_clear_time = time.time()
i += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment