human (Oct 26, 2024, 08:01 PM)
I want to write a Python program which reads from standard input, if standard input is available - if not it exits with a message
It reads the first 200 bytes from standard input into a string
Then it starts a subprocess running for "jq .blah" and pipes in the 200 bytes it already read, then pipes in the rest of the input from standard input in a stream without reading it all into memory
It streams the output of jq
If jq errors, it streams out the error messages to stderr
It returns the same exit code that jq returned
assistant (Oct 26, 2024, 08:01 PM)
I'll help you create a Python program that handles streaming input and subprocess management with these requirements.
#!/usr/bin/env python3
import sys
import subprocess
import os
def check_stdin():
"""Check if there's data available on stdin."""
return not sys.stdin.isatty()
def process_stream():
"""Process input stream through jq."""
if not check_stdin():
print("No input provided on stdin", file=sys.stderr)
sys.exit(1)
# Read first 200 bytes
initial_chunk = sys.stdin.buffer.read(200)
# Start jq subprocess
process = subprocess.Popen(
['jq', '.blah'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Function to handle streaming output
def stream_output(pipe, target):
while True:
chunk = pipe.read(8192) # Read in 8KB chunks
if not chunk:
break
target.buffer.write(chunk)
target.buffer.flush()
try:
# Write initial chunk to jq's stdin
process.stdin.write(initial_chunk)
# Stream the rest of stdin to jq
while True:
chunk = sys.stdin.buffer.read(8192) # Read in 8KB chunks
if not chunk:
break
process.stdin.write(chunk)
# Close stdin to signal we're done writing
process.stdin.close()
# Create separate thread for stderr to avoid blocking
import threading
stderr_thread = threading.Thread(
target=stream_output,
args=(process.stderr, sys.stderr)
)
stderr_thread.start()
# Stream stdout in main thread
stream_output(process.stdout, sys.stdout)
# Wait for stderr thread to complete
stderr_thread.join()
# Wait for process to complete and get exit code
return_code = process.wait()
sys.exit(return_code)
except BrokenPipeError:
# Handle case where output pipe is closed
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())
sys.exit(1)
except KeyboardInterrupt:
# Handle Ctrl+C gracefully
process.terminate()
process.wait()
sys.exit(130)
finally:
# Ensure process resources are cleaned up
process.stdout.close()
process.stderr.close()
if __name__ == '__main__':
process_stream()I've created a Python script that implements all the requested functionality. Here's how it works:
- Checks if stdin is available using
isatty() - Reads exactly 200 bytes from stdin using
sys.stdin.buffer.read(200) - Starts a
jq .blahsubprocess with pipes for stdin, stdout, and stderr - Streams the input in chunks:
- First writes the initial 200 bytes
- Then streams the rest of stdin in 8KB chunks
- Handles stdout and stderr streaming concurrently using a separate thread for stderr
- Propagates the exit code from jq
- Includes error handling for:
- Broken pipes
- Keyboard interrupts
- Missing stdin
- Process cleanup
You can use it like this:
cat data.json | ./script.py
# or
./script.py < data.jsonThe script ensures memory efficiency by:
- Only buffering the initial 200 bytes
- Streaming all other data in 8KB chunks
- Using separate threads for stdout and stderr to prevent blocking
- Properly cleaning up resources in all cases
Let me know if you'd like me to explain any part in more detail or if you need any modifications!
human (Oct 26, 2024, 08:14 PM)
explain the thread bit with stderr, what is that for?
assistant (Oct 26, 2024, 08:14 PM)
Ah yes - the stderr threading is important to handle a specific issue that can occur with subprocesses. Let me explain:
When dealing with a subprocess that produces both stdout and stderr, if you try to read them sequentially (first stdout, then stderr or vice versa), you can run into a deadlock situation. This happens because:
- The subprocess might fill up its stderr buffer while writing error messages
- Once the buffer is full, the subprocess will block, waiting for someone to read from stderr
- But if your main program is busy reading from stdout and hasn't gotten to stderr yet, you're stuck
- Now both programs are waiting for each other - a classic deadlock
That's why we use a separate thread to read stderr concurrently:
# Create separate thread for stderr to avoid blocking
import threading
stderr_thread = threading.Thread(
target=stream_output,
args=(process.stderr, sys.stderr)
)
stderr_thread.start()
# Stream stdout in main thread
stream_output(process.stdout, sys.stdout)
# Wait for stderr thread to complete
stderr_thread.join()This way:
- One thread continuously reads and forwards stdout
- Another thread continuously reads and forwards stderr
- Neither stream's buffer can fill up and block the subprocess
- All output (both normal and error messages) appears in real-time
If we didn't use threading, and jq produced a lot of error output while also producing regular output, the script could freeze up. This is particularly important for tools like jq that might produce both types of output simultaneously (for example, if it's processing a stream of JSON where some entries are valid and some aren't).
The thread.join() at the end ensures we don't exit the program until all error messages have been fully processed.
human (Oct 26, 2024, 08:14 PM)
If there is a stderr I don't think it will be very long, so I don't want to do the thread thing
assistant (Oct 26, 2024, 08:14 PM)
Ah, good point! If we expect stderr to be minimal (which is likely true for jq errors), we can simplify this. Here's the modified version:
#!/usr/bin/env python3
import sys
import subprocess
import os
def check_stdin():
"""Check if there's data available on stdin."""
return not sys.stdin.isatty()
def process_stream():
"""Process input stream through jq."""
if not check_stdin():
print("No input provided on stdin", file=sys.stderr)
sys.exit(1)
# Read first 200 bytes
initial_chunk = sys.stdin.buffer.read(200)
# Start jq subprocess
process = subprocess.Popen(
['jq', '.blah'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
try:
# Write initial chunk to jq's stdin
process.stdin.write(initial_chunk)
# Stream the rest of stdin to jq
while True:
chunk = sys.stdin.buffer.read(8192) # Read in 8KB chunks
if not chunk:
break
process.stdin.write(chunk)
# Close stdin to signal we're done writing
process.stdin.close()
# Stream stdout
while True:
chunk = process.stdout.read(8192)
if not chunk:
break
sys.stdout.buffer.write(chunk)
sys.stdout.buffer.flush()
# After stdout is done, read and forward any stderr
stderr_data = process.stderr.read()
if stderr_data:
sys.stderr.buffer.write(stderr_data)
sys.stderr.buffer.flush()
# Wait for process to complete and get exit code
return_code = process.wait()
sys.exit(return_code)
except BrokenPipeError:
# Handle case where output pipe is closed
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())
sys.exit(1)
except KeyboardInterrupt:
# Handle Ctrl+C gracefully
process.terminate()
process.wait()
sys.exit(130)
finally:
# Ensure process resources are cleaned up
process.stdout.close()
process.stderr.close()
if __name__ == '__main__':
process_stream()
The key changes are:
- Removed all the threading code
- Now we handle the streams sequentially:
- First we write all input to jq
- Then we read and forward all stdout
- Finally we read and forward any stderr
- For stderr, we just do a single read() since we expect it to be small
- If there is any error output, it'll appear at the end after all the regular output
This is much simpler and will work fine for the jq use case where stderr is typically just error messages that appear when something goes wrong. The only downside is that error messages won't appear until after all the stdout is processed, but that's probably acceptable here.
https://simonwillison.net/2024/Oct/27/llm-jq/