Skip to content

Instantly share code, notes, and snippets.

@jhorneman
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhorneman/75fc66433127f90a2197 to your computer and use it in GitHub Desktop.
Save jhorneman/75fc66433127f90a2197 to your computer and use it in GitHub Desktop.
Non-blocking communication with a subprocess using gevent
# -*- coding: utf-8 -*-
import sys
# Based on http://stackoverflow.com/a/4896288 - modified to use Gevent
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(_output_stream, _queue):
for line in iter(_output_stream.readline, ""):
_queue.put(line)
_output_stream.close()
def subprocess_done(_):
print "That's all folks!"
def consume():
import gevent
from gevent import subprocess
from gevent import queue
p = subprocess.Popen(['python', __file__, 'produce'], stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX)
q = queue.Queue()
g = gevent.spawn(enqueue_output, p.stdout, q)
# We could test p.stdout.closed to see if the stream is still open (and the subprocess is still going)
# Better: test the enqueue_output greenlet - it ends when the stream is closed
# We can also react asynchronously to the subprocess ending
g.link(subprocess_done)
# while not p.stdout.closed:
while not g.ready():
try:
line = q.get_nowait() # or q.get(timeout=.1)
except queue.Empty:
print "Working..."
gevent.sleep(.5)
else:
print "SUBPROCESS OUTPUT:", line.strip()
def produce():
import time
print "Start"
sys.stdout.flush()
for x in range(1, 3+1):
time.sleep(1)
print "Step", x
sys.stdout.flush()
time.sleep(1)
print "Done"
sys.stdout.flush()
sys.exit(0)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "produce":
produce()
else:
consume()
@jhorneman
Copy link
Author

Python 2 only, but not hard to make Python 3 compatible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment