Last active
October 8, 2023 00:37
Revisions
-
comex revised this gist
Oct 8, 2023 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,4 @@ #!/usr/bin/env python3 import sys, pty, os, time, threading, queue, argparse ap = argparse.ArgumentParser() -
comex revised this gist
Oct 7, 2023 . 1 changed file with 18 additions and 9 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,14 +1,22 @@ import sys, pty, os, time, threading, queue, argparse ap = argparse.ArgumentParser() parser = argparse.ArgumentParser( description='Tests the input latency of a shell', usage='usage: shell-latency-test-infinite.py [--sleep] [--] argv-to-execute...', ) parser.add_argument('--sleep', action='store_true', help='''only type once every 1s instead of constantly - results in higher observed latency, probably due to OS overhead''') parser.add_argument('argv_to_execute', nargs='+', help='shell command to execute') args = parser.parse_args() # Spawn the shell in a pty which we own the controlling end of. child_pid, fd = pty.fork() if child_pid == 0: # This is the child process. Exec the shell. os.execvp(args.argv_to_execute[0], args.argv_to_execute) raise Exception("execvp failed") # Otherwise, we're the parent. @@ -21,14 +29,13 @@ def read_thread(): while new_data := os.read(fd, 1048576): for q in data_queues: q.put(new_data) threading.Thread(target=read_thread, daemon=True).start() def position_request_thread(): '''Handle position requests.''' buf = b'' while True: new_data = data_queue_position_requests.get() buf = buf[-3:] + new_data for _ in range(buf.count(b'\x1b[6n')): os.write(fd, b'\x1b[1;1R') @@ -69,8 +76,10 @@ def join_queue(q): new_data = data_queue_main.get() duration_us = (time.time() - time_pre) * 1_000_000 durations.append(duration_us) if args.sleep: time.sleep(1) total_duration = time.time() - durations_clear_time if total_duration >= 1.0 or args.sleep: average_duration_us = sum(durations) / len(durations) print(f'{len(durations)} rounds in {total_duration:2.1f}s, average latency {average_duration_us:5,.0f}us') durations = [] -
comex created this gist
Oct 7, 2023 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,78 @@ import sys, pty, os, time, threading, queue sub_argv = sys.argv[1:] if not sub_argv: raise Exception("usage: shell-latency-test-infinite.py shell-to-test (e.g.: shell-latency-test-infinite.py nu)") # Spawn the shell in a pty which we own the controlling end of. child_pid, fd = pty.fork() if child_pid == 0: # This is the child process. Exec the shell. os.execvp(sub_argv[0], sub_argv) raise Exception("execvp failed") # Otherwise, we're the parent. data_queue_main = queue.SimpleQueue() data_queue_position_requests = queue.SimpleQueue() data_queues = [data_queue_main, data_queue_position_requests] def read_thread(): '''Constantly read input from the shell and send it to each of the queues.''' while new_data := os.read(fd, 1048576): for q in data_queues: q.put(new_data) for q in data_queues: q.put(None) threading.Thread(target=read_thread, daemon=True).start() def position_request_thread(): '''Handle position requests.''' buf = b'' while new_data := data_queue_position_requests.get(): buf = buf[-3:] + new_data for _ in range(buf.count(b'\x1b[6n')): os.write(fd, b'\x1b[1;1R') threading.Thread(target=position_request_thread, daemon=True).start() def join_queue(q): '''Remove all bytes objects currently in the queue and return them concatenated.''' ret = b'' while True: try: ret += q.get_nowait() except queue.Empty: break return ret # Wait one second to ensure the shell is done initializing. time.sleep(1) startup_data = join_queue(data_queue_main) print('startup:', startup_data) durations = [] durations_clear_time = time.time() i = 0 while True: # Did we get any extra data, suggesting the below assumption is violated? junk = join_queue(data_queue_main) if junk: print('junk:', junk) # Alternately input 'a' and input a backspace to erase the 'a'. os.write(fd, b'\b' if i & 1 else b'a') # The timing isn't exact (e.g. there might be a delay between the write # and calculating the time here), but any error is tens of microseconds # at most. time_pre = time.time() # Assume the first read contains the full response to the input and the # shell won't be sending anything more. If we didn't assume this, we # couldn't immediately continue with more keystrokes. new_data = data_queue_main.get() duration_us = (time.time() - time_pre) * 1_000_000 durations.append(duration_us) total_duration = time.time() - durations_clear_time if total_duration >= 1.0: average_duration_us = sum(durations) / len(durations) print(f'{len(durations)} rounds in {total_duration:2.1f}s, average latency {average_duration_us:5,.0f}us') durations = [] durations_clear_time = time.time() i += 1