Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zed/b3c2709b4d22f98e1da2 to your computer and use it in GitHub Desktop.
Save zed/b3c2709b4d22f98e1da2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""Parallel version of
http://stackoverflow.com/a/32741930/
"""
import collections
import os
import select
import signal
import subprocess
import threading
testapps = {
'slow': '''
import os
import time
import sys
t = float(sys.argv[1])
os.write(1, 'aaa')
time.sleep(t)
os.write(2, 'bbb')
time.sleep(t)
os.write(1, 'ccc')
''',
'fast': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbb')
os.write(1, 'ccc')
''',
'fast2': '''
import os
os.write(1, 'aaa')
os.write(2, 'bbbbbbbbbbbbbbb')
os.write(1, 'ccc')
'''
}
def readfds(fds, maxread):
while fds:
for fd in select.select(fds, [], [])[0]:
data = os.read(fd, maxread)
if not data: # EOF
fds.remove(fd)
else:
yield fd, data
def readfromapp(app, rounds=10, maxread=1024, delay=0.01):
processes = {} # Popen -> data
fds = {} # fd -> data
# start all processes at once (in parallel)
for i in range(0, rounds):
p = subprocess.Popen(['python', '-u', '-c', testapps[app], str(delay)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
close_fds=True)
p.send_signal(signal.SIGSTOP) # pause unconditionally
fds[p.stdout.fileno()] = fds[p.stderr.fileno()] = processes[p] = bytearray()
fds_set = set(fds.keys())
# read output
def wake_up():
for p in processes:
p.send_signal(signal.SIGCONT) # wake up
threading.Thread(target=wake_up).start()
for fd, data in readfds(fds_set, maxread):
fds[fd] += data
results = collections.Counter() # group identical results
for p, data in processes.items():
with p.stdout, p.stderr:
results[bytes(data)] += 1
for p in processes: # reap child processes
p.wait()
# display results
print('running %i rounds %s with maxread=%i' % (rounds, app, maxread))
for data, count in results.most_common():
print('% 3d x %s' % (count, data))
print("\n=> if output is produced slowly this should work as wished XXX")
print( " and should return: aaabbbccc")
readfromapp('slow', rounds=100, maxread=1024)
print("\n=> if output is produced even more slowly this should work as wished")
print( " and should return: aaabbbccc")
readfromapp('slow', rounds=100, maxread=1024, delay=1)
print("\n=> now mostly aaacccbbb is returnd, not as it should be")
readfromapp('fast', rounds=100, maxread=1024)
print("\n=> you could try to read data one by one, and return")
print( " e.g. a whole line only when LF is read")
print( " (b's should be finished before c's)")
readfromapp('fast', rounds=100, maxread=1)
print("\n=> but even this won't work ...")
readfromapp('fast2', rounds=100, maxread=1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment