Skip to content

Instantly share code, notes, and snippets.

@tos-kamiya
Last active December 5, 2021 14:28
Show Gist options
  • Save tos-kamiya/a235ab6114589099f9ff59104d9b82db to your computer and use it in GitHub Desktop.
Save tos-kamiya/a235ab6114589099f9ff59104d9b82db to your computer and use it in GitHub Desktop.
Non-blocking read from child process's stdout
# ref: https://stackoverflow.com/questions/375427/a-non-blocking-read-on-a-subprocess-pipe-in-python
# When I tried the answer in this article, I got the following warning and could not do it properly, so I fixed it.
# /usr/lib/python3.8/subprocess.py:842: RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used
from typing import *
import sys
from subprocess import PIPE, Popen
from threading import Thread
from queue import Queue, Empty
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
class NonblockingProcess:
def __init__(self, cmd: List[str]):
p = Popen(cmd[:], stdin=PIPE, stdout=PIPE, bufsize=0, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
self._p = p
self._q = q
self._read_buffer = b''
def read(self, timeout=None) -> Optional[bytes]:
line = self._split_line_from_buffer()
if line:
return line
try:
buf = self._q.get(timeout=timeout)
self._read_buffer += buf
except Empty:
pass
return self._split_line_from_buffer()
def read_nowait(self) -> Optional[bytes]:
line = self._split_line_from_buffer()
if line:
return line
try:
buf = self._q.get_nowait()
self._read_buffer += buf
except Empty:
pass
return self._split_line_from_buffer()
def _split_line_from_buffer(self) -> Optional[bytes]:
if self._read_buffer:
i = self._read_buffer.find(b'\n')
if i > 0:
line = self._read_buffer[:i + 1]
self._read_buffer = self._read_buffer[i + 1:]
return line
return None
def write(self, line: bytes) -> None:
self._p.stdin.write(line)
if __name__ == '__main__':
import time
p = NonblockingProcess(['cat'])
line = (p.read_nowait() or b'').decode()
print(r"line = %s (should be '')" % repr(line))
p.write("hello\n".encode())
time.sleep(0.01)
line = (p.read_nowait() or b'').decode()
print(r"line = %s (should be 'hello\n')" % repr(line))
p.write("world\n".encode())
time.sleep(0.01)
line = (p.read_nowait() or b'').decode()
print(r"line = %s (should be 'world\n')" % repr(line))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment