Last active
December 5, 2021 14:28
-
-
Save tos-kamiya/a235ab6114589099f9ff59104d9b82db to your computer and use it in GitHub Desktop.
Non-blocking read from child process's stdout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ref: https://stackoverflow.com/questions/375427/a-non-blocking-read-on-a-subprocess-pipe-in-python | |
# When I tried the answer in this article, I got the following warning and could not do it properly, so I fixed it. | |
# /usr/lib/python3.8/subprocess.py:842: RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used | |
from typing import * | |
import sys | |
from subprocess import PIPE, Popen | |
from threading import Thread | |
from queue import Queue, Empty | |
ON_POSIX = 'posix' in sys.builtin_module_names | |
def enqueue_output(out, queue): | |
for line in iter(out.readline, b''): | |
queue.put(line) | |
out.close() | |
class NonblockingProcess: | |
def __init__(self, cmd: List[str]): | |
p = Popen(cmd[:], stdin=PIPE, stdout=PIPE, bufsize=0, close_fds=ON_POSIX) | |
q = Queue() | |
t = Thread(target=enqueue_output, args=(p.stdout, q)) | |
t.daemon = True # thread dies with the program | |
t.start() | |
self._p = p | |
self._q = q | |
self._read_buffer = b'' | |
def read(self, timeout=None) -> Optional[bytes]: | |
line = self._split_line_from_buffer() | |
if line: | |
return line | |
try: | |
buf = self._q.get(timeout=timeout) | |
self._read_buffer += buf | |
except Empty: | |
pass | |
return self._split_line_from_buffer() | |
def read_nowait(self) -> Optional[bytes]: | |
line = self._split_line_from_buffer() | |
if line: | |
return line | |
try: | |
buf = self._q.get_nowait() | |
self._read_buffer += buf | |
except Empty: | |
pass | |
return self._split_line_from_buffer() | |
def _split_line_from_buffer(self) -> Optional[bytes]: | |
if self._read_buffer: | |
i = self._read_buffer.find(b'\n') | |
if i > 0: | |
line = self._read_buffer[:i + 1] | |
self._read_buffer = self._read_buffer[i + 1:] | |
return line | |
return None | |
def write(self, line: bytes) -> None: | |
self._p.stdin.write(line) | |
if __name__ == '__main__': | |
import time | |
p = NonblockingProcess(['cat']) | |
line = (p.read_nowait() or b'').decode() | |
print(r"line = %s (should be '')" % repr(line)) | |
p.write("hello\n".encode()) | |
time.sleep(0.01) | |
line = (p.read_nowait() or b'').decode() | |
print(r"line = %s (should be 'hello\n')" % repr(line)) | |
p.write("world\n".encode()) | |
time.sleep(0.01) | |
line = (p.read_nowait() or b'').decode() | |
print(r"line = %s (should be 'world\n')" % repr(line)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment