Last active
July 30, 2022 00:16
-
-
Save zed/b3c2709b4d22f98e1da2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Parallel version of | |
http://stackoverflow.com/a/32741930/ | |
""" | |
import collections | |
import os | |
import select | |
import signal | |
import subprocess | |
import threading | |
testapps = { | |
'slow': ''' | |
import os | |
import time | |
import sys | |
t = float(sys.argv[1]) | |
os.write(1, 'aaa') | |
time.sleep(t) | |
os.write(2, 'bbb') | |
time.sleep(t) | |
os.write(1, 'ccc') | |
''', | |
'fast': ''' | |
import os | |
os.write(1, 'aaa') | |
os.write(2, 'bbb') | |
os.write(1, 'ccc') | |
''', | |
'fast2': ''' | |
import os | |
os.write(1, 'aaa') | |
os.write(2, 'bbbbbbbbbbbbbbb') | |
os.write(1, 'ccc') | |
''' | |
} | |
def readfds(fds, maxread): | |
while fds: | |
for fd in select.select(fds, [], [])[0]: | |
data = os.read(fd, maxread) | |
if not data: # EOF | |
fds.remove(fd) | |
else: | |
yield fd, data | |
def readfromapp(app, rounds=10, maxread=1024, delay=0.01): | |
processes = {} # Popen -> data | |
fds = {} # fd -> data | |
# start all processes at once (in parallel) | |
for i in range(0, rounds): | |
p = subprocess.Popen(['python', '-u', '-c', testapps[app], str(delay)], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
bufsize=0, | |
close_fds=True) | |
p.send_signal(signal.SIGSTOP) # pause unconditionally | |
fds[p.stdout.fileno()] = fds[p.stderr.fileno()] = processes[p] = bytearray() | |
fds_set = set(fds.keys()) | |
# read output | |
def wake_up(): | |
for p in processes: | |
p.send_signal(signal.SIGCONT) # wake up | |
threading.Thread(target=wake_up).start() | |
for fd, data in readfds(fds_set, maxread): | |
fds[fd] += data | |
results = collections.Counter() # group identical results | |
for p, data in processes.items(): | |
with p.stdout, p.stderr: | |
results[bytes(data)] += 1 | |
for p in processes: # reap child processes | |
p.wait() | |
# display results | |
print('running %i rounds %s with maxread=%i' % (rounds, app, maxread)) | |
for data, count in results.most_common(): | |
print('% 3d x %s' % (count, data)) | |
print("\n=> if output is produced slowly this should work as wished XXX") | |
print( " and should return: aaabbbccc") | |
readfromapp('slow', rounds=100, maxread=1024) | |
print("\n=> if output is produced even more slowly this should work as wished") | |
print( " and should return: aaabbbccc") | |
readfromapp('slow', rounds=100, maxread=1024, delay=1) | |
print("\n=> now mostly aaacccbbb is returnd, not as it should be") | |
readfromapp('fast', rounds=100, maxread=1024) | |
print("\n=> you could try to read data one by one, and return") | |
print( " e.g. a whole line only when LF is read") | |
print( " (b's should be finished before c's)") | |
readfromapp('fast', rounds=100, maxread=1) | |
print("\n=> but even this won't work ...") | |
readfromapp('fast2', rounds=100, maxread=1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment