Skip to content

Instantly share code, notes, and snippets.

@simonw
Created October 27, 2024 03:19
Show Gist options
  • Save simonw/484d878877f53537f38e48a7a3845df2 to your computer and use it in GitHub Desktop.
Save simonw/484d878877f53537f38e48a7a3845df2 to your computer and use it in GitHub Desktop.

Python program to stream input to subprocess

human (Oct 26, 2024, 08:01 PM)

I want to write a Python program which reads from standard input, if standard input is available - if not it exits with a message

It reads the first 200 bytes from standard input into a string

Then it starts a subprocess running for "jq .blah" and pipes in the 200 bytes it already read, then pipes in the rest of the input from standard input in a stream without reading it all into memory

It streams the output of jq

If jq errors, it streams out the error messages to stderr

It returns the same exit code that jq returned

assistant (Oct 26, 2024, 08:01 PM)

I'll help you create a Python program that handles streaming input and subprocess management with these requirements.

create Stream Processing with JQ

#!/usr/bin/env python3
import sys
import subprocess
import os

def check_stdin():
    """Check if there's data available on stdin."""
    return not sys.stdin.isatty()

def process_stream():
    """Process input stream through jq."""
    if not check_stdin():
        print("No input provided on stdin", file=sys.stderr)
        sys.exit(1)
    
    # Read first 200 bytes
    initial_chunk = sys.stdin.buffer.read(200)
    
    # Start jq subprocess
    process = subprocess.Popen(
        ['jq', '.blah'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    
    # Function to handle streaming output
    def stream_output(pipe, target):
        while True:
            chunk = pipe.read(8192)  # Read in 8KB chunks
            if not chunk:
                break
            target.buffer.write(chunk)
            target.buffer.flush()
    
    try:
        # Write initial chunk to jq's stdin
        process.stdin.write(initial_chunk)
        
        # Stream the rest of stdin to jq
        while True:
            chunk = sys.stdin.buffer.read(8192)  # Read in 8KB chunks
            if not chunk:
                break
            process.stdin.write(chunk)
        
        # Close stdin to signal we're done writing
        process.stdin.close()
        
        # Create separate thread for stderr to avoid blocking
        import threading
        stderr_thread = threading.Thread(
            target=stream_output,
            args=(process.stderr, sys.stderr)
        )
        stderr_thread.start()
        
        # Stream stdout in main thread
        stream_output(process.stdout, sys.stdout)
        
        # Wait for stderr thread to complete
        stderr_thread.join()
        
        # Wait for process to complete and get exit code
        return_code = process.wait()
        sys.exit(return_code)
        
    except BrokenPipeError:
        # Handle case where output pipe is closed
        devnull = os.open(os.devnull, os.O_WRONLY)
        os.dup2(devnull, sys.stdout.fileno())
        sys.exit(1)
    except KeyboardInterrupt:
        # Handle Ctrl+C gracefully
        process.terminate()
        process.wait()
        sys.exit(130)
    finally:
        # Ensure process resources are cleaned up
        process.stdout.close()
        process.stderr.close()

if __name__ == '__main__':
    process_stream()

I've created a Python script that implements all the requested functionality. Here's how it works:

  1. Checks if stdin is available using isatty()
  2. Reads exactly 200 bytes from stdin using sys.stdin.buffer.read(200)
  3. Starts a jq .blah subprocess with pipes for stdin, stdout, and stderr
  4. Streams the input in chunks:
    • First writes the initial 200 bytes
    • Then streams the rest of stdin in 8KB chunks
  5. Handles stdout and stderr streaming concurrently using a separate thread for stderr
  6. Propagates the exit code from jq
  7. Includes error handling for:
    • Broken pipes
    • Keyboard interrupts
    • Missing stdin
    • Process cleanup

You can use it like this:

cat data.json | ./script.py
# or
./script.py < data.json

The script ensures memory efficiency by:

  • Only buffering the initial 200 bytes
  • Streaming all other data in 8KB chunks
  • Using separate threads for stdout and stderr to prevent blocking
  • Properly cleaning up resources in all cases

Let me know if you'd like me to explain any part in more detail or if you need any modifications!

human (Oct 26, 2024, 08:14 PM)

explain the thread bit with stderr, what is that for?

assistant (Oct 26, 2024, 08:14 PM)

Ah yes - the stderr threading is important to handle a specific issue that can occur with subprocesses. Let me explain:

When dealing with a subprocess that produces both stdout and stderr, if you try to read them sequentially (first stdout, then stderr or vice versa), you can run into a deadlock situation. This happens because:

  1. The subprocess might fill up its stderr buffer while writing error messages
  2. Once the buffer is full, the subprocess will block, waiting for someone to read from stderr
  3. But if your main program is busy reading from stdout and hasn't gotten to stderr yet, you're stuck
  4. Now both programs are waiting for each other - a classic deadlock

That's why we use a separate thread to read stderr concurrently:

# Create separate thread for stderr to avoid blocking
import threading
stderr_thread = threading.Thread(
    target=stream_output,
    args=(process.stderr, sys.stderr)
)
stderr_thread.start()

# Stream stdout in main thread
stream_output(process.stdout, sys.stdout)

# Wait for stderr thread to complete
stderr_thread.join()

This way:

  • One thread continuously reads and forwards stdout
  • Another thread continuously reads and forwards stderr
  • Neither stream's buffer can fill up and block the subprocess
  • All output (both normal and error messages) appears in real-time

If we didn't use threading, and jq produced a lot of error output while also producing regular output, the script could freeze up. This is particularly important for tools like jq that might produce both types of output simultaneously (for example, if it's processing a stream of JSON where some entries are valid and some aren't).

The thread.join() at the end ensures we don't exit the program until all error messages have been fully processed.

human (Oct 26, 2024, 08:14 PM)

If there is a stderr I don't think it will be very long, so I don't want to do the thread thing

assistant (Oct 26, 2024, 08:14 PM)

Ah, good point! If we expect stderr to be minimal (which is likely true for jq errors), we can simplify this. Here's the modified version:

rewrite Untitled

#!/usr/bin/env python3
import sys
import subprocess
import os

def check_stdin():
    """Check if there's data available on stdin."""
    return not sys.stdin.isatty()

def process_stream():
    """Process input stream through jq."""
    if not check_stdin():
        print("No input provided on stdin", file=sys.stderr)
        sys.exit(1)
    
    # Read first 200 bytes
    initial_chunk = sys.stdin.buffer.read(200)
    
    # Start jq subprocess
    process = subprocess.Popen(
        ['jq', '.blah'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    
    try:
        # Write initial chunk to jq's stdin
        process.stdin.write(initial_chunk)
        
        # Stream the rest of stdin to jq
        while True:
            chunk = sys.stdin.buffer.read(8192)  # Read in 8KB chunks
            if not chunk:
                break
            process.stdin.write(chunk)
        
        # Close stdin to signal we're done writing
        process.stdin.close()
        
        # Stream stdout
        while True:
            chunk = process.stdout.read(8192)
            if not chunk:
                break
            sys.stdout.buffer.write(chunk)
            sys.stdout.buffer.flush()
        
        # After stdout is done, read and forward any stderr
        stderr_data = process.stderr.read()
        if stderr_data:
            sys.stderr.buffer.write(stderr_data)
            sys.stderr.buffer.flush()
        
        # Wait for process to complete and get exit code
        return_code = process.wait()
        sys.exit(return_code)
        
    except BrokenPipeError:
        # Handle case where output pipe is closed
        devnull = os.open(os.devnull, os.O_WRONLY)
        os.dup2(devnull, sys.stdout.fileno())
        sys.exit(1)
    except KeyboardInterrupt:
        # Handle Ctrl+C gracefully
        process.terminate()
        process.wait()
        sys.exit(130)
    finally:
        # Ensure process resources are cleaned up
        process.stdout.close()
        process.stderr.close()

if __name__ == '__main__':
    process_stream()

The key changes are:

  1. Removed all the threading code
  2. Now we handle the streams sequentially:
    • First we write all input to jq
    • Then we read and forward all stdout
    • Finally we read and forward any stderr
  3. For stderr, we just do a single read() since we expect it to be small
    • If there is any error output, it'll appear at the end after all the regular output

This is much simpler and will work fine for the jq use case where stderr is typically just error messages that appear when something goes wrong. The only downside is that error messages won't appear until after all the stdout is processed, but that's probably acceptable here.

@simonw
Copy link
Author

simonw commented Oct 27, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment