Skip to content

Instantly share code, notes, and snippets.

@zed
Created November 27, 2010 01:49
Show Gist options
  • Save zed/717467 to your computer and use it in GitHub Desktop.
Save zed/717467 to your computer and use it in GitHub Desktop.
Run all processes in parallel. Group output by process groups and order within groups
#!/usr/bin/env python
"""Run all processes in parallel.
Group output by process groups and order within groups
"""
import shlex
import subprocess
import sys
import threading
from collections import defaultdict
try:
from queue import Queue
except ImportError:
from Queue import Queue # python 2.x
NOT_ID = object()
def read_stdout(process, queue, group_id, command_id):
"""Read stdout from the `process` and put it in the `queue`."""
queue.put((process.stdout.read(), group_id, command_id))
process.stdout.close()
def print_stdout(queue, no_sync=False, file_=None):
"""Print stdout from the `queue`.
Group output by process' groups and processes order within groups.
If `no_sync` is true then print the process stdout immediately.
"""
if file_ is None:
if sys.version_info < (3,):
file_ = sys.stdout
else:
file_ = sys.stdout.buffer
outputs = defaultdict(dict)
group_size = {}
while 1:
stdout, group_id, command_id = queue.get()
queue.task_done()
if group_id is NOT_ID:
# all processes complete
assert not outputs
assert not group_size
break
if no_sync:
if stdout is not NOT_ID:
# print output immediately
file_.write(stdout)
file_.flush()
else:
if stdout is NOT_ID:
# command_id is the number of processes in the group
# in this case
group_size[group_id] = command_id
else: # save output for later
outputs[group_id][command_id] = stdout
# check whether we received output from all processes in the group
if (group_id in group_size and group_id in outputs and
len(outputs[group_id]) == group_size[group_id]):
# group complete; print group stdout in order
for command_id in sorted(outputs[group_id]):
file_.write(outputs[group_id][command_id])
file_.flush()
del outputs[group_id] # preserve memory
del group_size[group_id] # done with the group
def gen_command_groups():
"""Generate command groups to demo the script.
"""
yield [['printf', 'gid {0:3} pid 0 no sleep\n'.format(0)]]
yield [shlex.split("""sh -c 'sleep {1};
/bin/echo -e "gid {0:3} pid {2} sleep {1}"'
""".format(100, 4, i)) for i in range(2)]
n = 3
for gid in range(n):
yield map(shlex.split,
["""sh -c 'sleep {2}; /bin/echo -e "gid {0:3} pid 0 sleep {2}" '
""".format(10-gid, n, 2),
"python -c 'import sys, time; "
"delay = {1}-{2}; "
"time.sleep(delay); "
"print(\"gid {0:3} pid 1 sleep %d\" % delay)' "
"".format(10-gid, n, gid)])
def start_thread(target, args=(), threads=None):
"""Start thread as a daemon and append it to threads.
`target` and `args` are passed directly to `threading.Thread()`
"""
t = threading.Thread(target=target, args=args)
t.daemon = True
if threads is not None:
threads.append(t)
t.start()
def burn_cpu():
i = 1
while i:
i += 1
def main():
"""Entry point for the script.
See module docstring.
"""
# make cpu busy
start_thread(burn_cpu)
queue = Queue()
writer_threads = []
reader_threads = []
# start thread that prints processes stdout
start_thread(print_stdout, (queue, "--no-sync" in sys.argv), writer_threads)
# start processes
processes = []
for group_id, commands in enumerate(gen_command_groups()):
for command_id, command in enumerate(commands):
p = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True)
processes.append(p)
# start thread that reads the process stdout
start_thread(read_stdout, (p, queue, group_id, command_id),
reader_threads)
# put group size into the queue
queue.put((NOT_ID, group_id, command_id + 1))
# wait for processes to complete
success = True
for p in processes:
return_code = p.wait()
if return_code != 0:
success = False
# wait for reader threads to complete
for t in reader_threads: t.join()
# signal processes completion
# . no output to be put into the queue after this point
queue.put((None, NOT_ID, None))
# wait for all output to be printed
for t in writer_threads: t.join()
print('done.')
if not success:
return 1
if __name__=="__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment