Create a gist now

Instantly share code, notes, and snippets.

Demo code for my post about python's blocking stream reading functions.
from subprocess import Popen, PIPE
from time import sleep
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# issue command:
p.stdin.write('command\n')
# let the shell output the result:
sleep(0.1)
# get the output
while True:
output = p.stdout.read() # <-- Hangs here!
if not output:
print '[No more data]'
break
print output
from subprocess import Popen, PIPE
from time import sleep
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK, read
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# set the O_NONBLOCK flag of p.stdout file descriptor:
flags = fcntl(p.stdout, F_GETFL) # get current p.stdout flags
fcntl(p.stdout, F_SETFL, flags | O_NONBLOCK)
# issue command:
p.stdin.write('command\n')
# let the shell output the result:
sleep(0.1)
# get the output
while True:
try:
print read(p.stdout.fileno(), 1024),
except OSError:
# the os throws an exception if there is no data
print '[No more data]'
break
from subprocess import Popen, PIPE
from time import sleep
from nbstreamreader import NonBlockingStreamReader as NBSR
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# wrap p.stdout with a NonBlockingStreamReader object:
nbsr = NBSR(p.stdout)
# issue command:
p.stdin.write('command\n')
# get the output
while True:
output = nbsr.readline(0.1) # 0.1 secs to let the shell output the result
if not output:
print '[No more data]'
break
print output
from threading import Thread
from Queue import Queue, Empty
class NonBlockingStreamReader:
def __init__(self, stream):
'''
stream: the stream to read from.
Usually a process' stdout or stderr.
'''
self._s = stream
self._q = Queue()
def _populateQueue(stream, queue):
'''
Collect lines from 'stream' and put them in 'quque'.
'''
while True:
line = stream.readline()
if line:
queue.put(line)
else:
raise UnexpectedEndOfStream
self._t = Thread(target = _populateQueue,
args = (self._s, self._q))
self._t.daemon = True
self._t.start() #start collecting lines from the stream
def readline(self, timeout = None):
try:
return self._q.get(block = timeout is not None,
timeout = timeout)
except Empty:
return None
class UnexpectedEndOfStream(Exception): pass
import sys
while True:
s = raw_input("Enter command: ")
print "You entered: {}".format(s)
sys.stdout.flush()
@ab77
ab77 commented Mar 18, 2016

Thank you! You've saved me quite a bit of time 😁

@NamPNQ
NamPNQ commented Apr 9, 2016

Nice! Thank you.

@Gerardwx
Gerardwx commented Oct 9, 2016

This is great ... what's the license?

Under python3 I had to change the import statement in nbstreamreader.py to from queue import Queue, Empty (lowercase module name)

@Fifan31
Fifan31 commented Dec 2, 2016

Thank you! You've saved my day!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment