Skip to content

Instantly share code, notes, and snippets.

@ymmn
Created October 5, 2017 23:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ymmn/dc2643808efdec9f4a752a46f0e4e3af to your computer and use it in GitHub Desktop.
Save ymmn/dc2643808efdec9f4a752a46f0e4e3af to your computer and use it in GitHub Desktop.
import subprocess
import time
from queue import Queue, Empty
from threading import Thread
import sys
def _enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
def stream_procs(procs, poll_interval_ms=100, timeout_ms=None):
"""
yields output for all processes until they're all finished
note that the processes themselves must not buffer their output
(for example, you'll want to run child python processes with the
"python -u" flag)
"""
start_time = time.time()
output_q = Queue()
output_threads = []
for proc in procs:
t = Thread(target=_enqueue_output, args=(proc.stdout, output_q))
t.daemon = True # thread dies with the program
t.start()
output_threads.append(t)
while True:
if timeout_ms is not None:
elapsed_time = time.time() - start_time
if elapsed_time > (timeout_ms / 1000.0):
for proc in procs:
is_still_running = proc.poll() is None
if is_still_running:
proc.kill()
break
try:
yield output_q.get_nowait()
except Empty:
pass
if all(p.poll() is not None for p in procs):
break
time.sleep(poll_interval_ms / 1000.0)
for thread in output_threads:
thread.join()
# flush output_q
while True:
try:
yield output_q.get_nowait()
except Empty:
break
# Example Usage
cmds = [
'python -u slow_program.py',
'python -u fast_program.py',
]
procs = [subprocess.Popen(cmd.split(' '), stdout=subprocess.PIPE)
for cmd in cmds]
for line in stream_procs(procs):
processed_line = '>>> {}'.format(line.decode('utf-8'))
sys.stdout.write(processed_line)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment