Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Demo code for my post about python's blocking stream reading functions.
from subprocess import Popen, PIPE
from time import sleep
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# issue command:
p.stdin.write('command\n')
# let the shell output the result:
sleep(0.1)
# get the output
while True:
output = p.stdout.read() # <-- Hangs here!
if not output:
print '[No more data]'
break
print output
from subprocess import Popen, PIPE
from time import sleep
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK, read
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# set the O_NONBLOCK flag of p.stdout file descriptor:
flags = fcntl(p.stdout, F_GETFL) # get current p.stdout flags
fcntl(p.stdout, F_SETFL, flags | O_NONBLOCK)
# issue command:
p.stdin.write('command\n')
# let the shell output the result:
sleep(0.1)
# get the output
while True:
try:
print read(p.stdout.fileno(), 1024),
except OSError:
# the os throws an exception if there is no data
print '[No more data]'
break
from subprocess import Popen, PIPE
from time import sleep
from nbstreamreader import NonBlockingStreamReader as NBSR
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# wrap p.stdout with a NonBlockingStreamReader object:
nbsr = NBSR(p.stdout)
# issue command:
p.stdin.write('command\n')
# get the output
while True:
output = nbsr.readline(0.1) # 0.1 secs to let the shell output the result
if not output:
print '[No more data]'
break
print output
from threading import Thread
from Queue import Queue, Empty
class NonBlockingStreamReader:
def __init__(self, stream):
'''
stream: the stream to read from.
Usually a process' stdout or stderr.
'''
self._s = stream
self._q = Queue()
def _populateQueue(stream, queue):
'''
Collect lines from 'stream' and put them in 'quque'.
'''
while True:
line = stream.readline()
if line:
queue.put(line)
else:
raise UnexpectedEndOfStream
self._t = Thread(target = _populateQueue,
args = (self._s, self._q))
self._t.daemon = True
self._t.start() #start collecting lines from the stream
def readline(self, timeout = None):
try:
return self._q.get(block = timeout is not None,
timeout = timeout)
except Empty:
return None
class UnexpectedEndOfStream(Exception): pass
import sys
while True:
s = raw_input("Enter command: ")
print "You entered: {}".format(s)
sys.stdout.flush()

ab77 commented Mar 18, 2016

Thank you! You've saved me quite a bit of time 😁

NamPNQ commented Apr 9, 2016

Nice! Thank you.

Gerardwx commented Oct 9, 2016

This is great ... what's the license?

Under python3 I had to change the import statement in nbstreamreader.py to from queue import Queue, Empty (lowercase module name)

Fifan31 commented Dec 2, 2016

Thank you! You've saved my day!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment