Skip to content

Instantly share code, notes, and snippets.

@comex
Last active October 8, 2023 00:37

Revisions

  1. comex revised this gist Oct 8, 2023. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions shell-latency-test-infinite.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,4 @@
    #!/usr/bin/env python3
    import sys, pty, os, time, threading, queue, argparse

    ap = argparse.ArgumentParser()
  2. comex revised this gist Oct 7, 2023. 1 changed file with 18 additions and 9 deletions.
    27 changes: 18 additions & 9 deletions shell-latency-test-infinite.py
    Original file line number Diff line number Diff line change
    @@ -1,14 +1,22 @@
    import sys, pty, os, time, threading, queue
    import sys, pty, os, time, threading, queue, argparse

    sub_argv = sys.argv[1:]
    if not sub_argv:
    raise Exception("usage: shell-latency-test-infinite.py shell-to-test (e.g.: shell-latency-test-infinite.py nu)")
    ap = argparse.ArgumentParser()
    parser = argparse.ArgumentParser(
    description='Tests the input latency of a shell',
    usage='usage: shell-latency-test-infinite.py [--sleep] [--] argv-to-execute...',
    )
    parser.add_argument('--sleep',
    action='store_true',
    help='''only type once every 1s instead of constantly - results in higher
    observed latency, probably due to OS overhead''')
    parser.add_argument('argv_to_execute', nargs='+', help='shell command to execute')
    args = parser.parse_args()

    # Spawn the shell in a pty which we own the controlling end of.
    child_pid, fd = pty.fork()
    if child_pid == 0:
    # This is the child process. Exec the shell.
    os.execvp(sub_argv[0], sub_argv)
    os.execvp(args.argv_to_execute[0], args.argv_to_execute)
    raise Exception("execvp failed")
    # Otherwise, we're the parent.

    @@ -21,14 +29,13 @@ def read_thread():
    while new_data := os.read(fd, 1048576):
    for q in data_queues:
    q.put(new_data)
    for q in data_queues:
    q.put(None)
    threading.Thread(target=read_thread, daemon=True).start()

    def position_request_thread():
    '''Handle position requests.'''
    buf = b''
    while new_data := data_queue_position_requests.get():
    while True:
    new_data = data_queue_position_requests.get()
    buf = buf[-3:] + new_data
    for _ in range(buf.count(b'\x1b[6n')):
    os.write(fd, b'\x1b[1;1R')
    @@ -69,8 +76,10 @@ def join_queue(q):
    new_data = data_queue_main.get()
    duration_us = (time.time() - time_pre) * 1_000_000
    durations.append(duration_us)
    if args.sleep:
    time.sleep(1)
    total_duration = time.time() - durations_clear_time
    if total_duration >= 1.0:
    if total_duration >= 1.0 or args.sleep:
    average_duration_us = sum(durations) / len(durations)
    print(f'{len(durations)} rounds in {total_duration:2.1f}s, average latency {average_duration_us:5,.0f}us')
    durations = []
  3. comex created this gist Oct 7, 2023.
    78 changes: 78 additions & 0 deletions shell-latency-test-infinite.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,78 @@
    import sys, pty, os, time, threading, queue

    sub_argv = sys.argv[1:]
    if not sub_argv:
    raise Exception("usage: shell-latency-test-infinite.py shell-to-test (e.g.: shell-latency-test-infinite.py nu)")

    # Spawn the shell in a pty which we own the controlling end of.
    child_pid, fd = pty.fork()
    if child_pid == 0:
    # This is the child process. Exec the shell.
    os.execvp(sub_argv[0], sub_argv)
    raise Exception("execvp failed")
    # Otherwise, we're the parent.

    data_queue_main = queue.SimpleQueue()
    data_queue_position_requests = queue.SimpleQueue()
    data_queues = [data_queue_main, data_queue_position_requests]

    def read_thread():
    '''Constantly read input from the shell and send it to each of the queues.'''
    while new_data := os.read(fd, 1048576):
    for q in data_queues:
    q.put(new_data)
    for q in data_queues:
    q.put(None)
    threading.Thread(target=read_thread, daemon=True).start()

    def position_request_thread():
    '''Handle position requests.'''
    buf = b''
    while new_data := data_queue_position_requests.get():
    buf = buf[-3:] + new_data
    for _ in range(buf.count(b'\x1b[6n')):
    os.write(fd, b'\x1b[1;1R')
    threading.Thread(target=position_request_thread, daemon=True).start()

    def join_queue(q):
    '''Remove all bytes objects currently in the queue and return them concatenated.'''
    ret = b''
    while True:
    try:
    ret += q.get_nowait()
    except queue.Empty:
    break
    return ret

    # Wait one second to ensure the shell is done initializing.
    time.sleep(1)
    startup_data = join_queue(data_queue_main)
    print('startup:', startup_data)

    durations = []
    durations_clear_time = time.time()
    i = 0
    while True:
    # Did we get any extra data, suggesting the below assumption is violated?
    junk = join_queue(data_queue_main)
    if junk:
    print('junk:', junk)
    # Alternately input 'a' and input a backspace to erase the 'a'.
    os.write(fd, b'\b' if i & 1 else b'a')
    # The timing isn't exact (e.g. there might be a delay between the write
    # and calculating the time here), but any error is tens of microseconds
    # at most.
    time_pre = time.time()
    # Assume the first read contains the full response to the input and the
    # shell won't be sending anything more. If we didn't assume this, we
    # couldn't immediately continue with more keystrokes.
    new_data = data_queue_main.get()
    duration_us = (time.time() - time_pre) * 1_000_000
    durations.append(duration_us)
    total_duration = time.time() - durations_clear_time
    if total_duration >= 1.0:
    average_duration_us = sum(durations) / len(durations)
    print(f'{len(durations)} rounds in {total_duration:2.1f}s, average latency {average_duration_us:5,.0f}us')
    durations = []
    durations_clear_time = time.time()
    i += 1