Instantly share code, notes, and snippets.

Embed
What would you like to do?
Demo code for my post about python's blocking stream reading functions.
from subprocess import Popen, PIPE
from time import sleep
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# issue command:
p.stdin.write('command\n')
# let the shell output the result:
sleep(0.1)
# get the output
while True:
output = p.stdout.read() # <-- Hangs here!
if not output:
print '[No more data]'
break
print output
from subprocess import Popen, PIPE
from time import sleep
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK, read
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# set the O_NONBLOCK flag of p.stdout file descriptor:
flags = fcntl(p.stdout, F_GETFL) # get current p.stdout flags
fcntl(p.stdout, F_SETFL, flags | O_NONBLOCK)
# issue command:
p.stdin.write('command\n')
# let the shell output the result:
sleep(0.1)
# get the output
while True:
try:
print read(p.stdout.fileno(), 1024),
except OSError:
# the os throws an exception if there is no data
print '[No more data]'
break
from subprocess import Popen, PIPE
from time import sleep
from nbstreamreader import NonBlockingStreamReader as NBSR
# run the shell as a subprocess:
p = Popen(['python', 'shell.py'],
stdin = PIPE, stdout = PIPE, stderr = PIPE, shell = False)
# wrap p.stdout with a NonBlockingStreamReader object:
nbsr = NBSR(p.stdout)
# issue command:
p.stdin.write('command\n')
# get the output
while True:
output = nbsr.readline(0.1) # 0.1 secs to let the shell output the result
if not output:
print '[No more data]'
break
print output
from threading import Thread
from Queue import Queue, Empty
class NonBlockingStreamReader:
def __init__(self, stream):
'''
stream: the stream to read from.
Usually a process' stdout or stderr.
'''
self._s = stream
self._q = Queue()
def _populateQueue(stream, queue):
'''
Collect lines from 'stream' and put them in 'quque'.
'''
while True:
line = stream.readline()
if line:
queue.put(line)
else:
raise UnexpectedEndOfStream
self._t = Thread(target = _populateQueue,
args = (self._s, self._q))
self._t.daemon = True
self._t.start() #start collecting lines from the stream
def readline(self, timeout = None):
try:
return self._q.get(block = timeout is not None,
timeout = timeout)
except Empty:
return None
class UnexpectedEndOfStream(Exception): pass
import sys
while True:
s = raw_input("Enter command: ")
print "You entered: {}".format(s)
sys.stdout.flush()
@ab77

This comment has been minimized.

ab77 commented Mar 18, 2016

Thank you! You've saved me quite a bit of time 😁

@NamPNQ

This comment has been minimized.

NamPNQ commented Apr 9, 2016

Nice! Thank you.

@Gerardwx

This comment has been minimized.

Gerardwx commented Oct 9, 2016

This is great ... what's the license?

Under python3 I had to change the import statement in nbstreamreader.py to from queue import Queue, Empty (lowercase module name)

@Fifan31

This comment has been minimized.

Fifan31 commented Dec 2, 2016

Thank you! You've saved my day!

@vtthanh83

This comment has been minimized.

vtthanh83 commented Feb 7, 2018

Thank you so much

@Fonata

This comment has been minimized.

Fonata commented Mar 11, 2018

Stating the obvious: This will not work on Microsoft Windows ("No module named 'fcntl'").

@JkShah1992

This comment has been minimized.

JkShah1992 commented Mar 26, 2018

Hi, So I was trying your code. And it ended without even waiting to get some input from a user. There seems to be wrong about your code.
Please can you take a look?
I ran client_thread.py. I have shell.py and nbstreamreader.py.

Output i received:

Enter command: You entered: command

[No more data]

Process finished with exit code 0

@sinamr66

This comment has been minimized.

sinamr66 commented Sep 20, 2018

Same problem here, how can we use this code?

@luipir

This comment has been minimized.

luipir commented Sep 28, 2018

Hi @EyalAr I used your code with reference in GPL2 code here:
https://gitlab.com/cartolab/geomove_pipelines/blob/master/processing_scripts_and_models/pdal_pipeline_executor.py#L340
Thanks for shareing your solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment