Last active
July 25, 2020 08:37
-
-
Save lemon24/252ee6aa122a2b38395b096f96cb8e26 to your computer and use it in GitHub Desktop.
process_multiplexer.py (from threads to Curio)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Read commands from stdin, run them in parallel, and send the output to stdout. | |
Command output looks like: | |
<command_id> <pid> (stdout|stderr) <original_line> | |
When a command exits, a single line containing the status is output: | |
<command_id> <pid> exited <status_code> | |
Example: | |
$ cat | python3 process_multiplexer.py | |
echo one; sleep 2; echo two >&2 | |
echo hello world; sleep 1; echo bye; exit 1 | |
0 3504 stdout one | |
1 3509 stdout hello world | |
0 3504 stderr two | |
0 3504 exited 0 | |
1 3509 stdout bye | |
1 3509 exited 1 | |
Could be improved by showing a progress bar / summary on stderr. | |
Error handling could also use some improvement (it's almost non-existent). | |
Based on a similar (but better) tool written in Rust by | |
https://github.com/andreivasiliu | |
""" | |
import subprocess | |
from threading import Thread | |
from queue import Queue | |
def spawn(target, *args): | |
thread = Thread(target=target, args=args) | |
thread.start() | |
return thread | |
def drain_file(queue, data, file): | |
for line in file: | |
queue.put((data, line)) | |
queue.put((data, None)) | |
def run_process(queue, data, args, kwargs): | |
kwargs.update(stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
try: | |
process = subprocess.Popen(args, **kwargs) | |
except Exception as e: | |
queue.put((data, None, 'exception', e)) | |
return | |
pid = process.pid | |
line_queue = Queue() | |
stdout_reader = spawn(drain_file, line_queue, 'stdout', process.stdout) | |
stderr_reader = spawn(drain_file, line_queue, 'stderr', process.stderr) | |
stdout_done = False | |
stderr_done = False | |
while not (stdout_done and stderr_done): | |
which, line = line_queue.get() | |
if line is None: | |
if which == 'stdout': | |
stdout_done = True | |
elif which == 'stderr': | |
stderr_done = True | |
else: | |
assert False, "unexpected which: %s" % which | |
continue | |
queue.put((data, pid, which, line)) | |
stdout_reader.join() | |
stderr_reader.join() | |
process.wait() | |
queue.put((data, pid, 'exited', process.returncode)) | |
def run_processes(commands, **kwargs): | |
runners = {} | |
queue = Queue() | |
for id, args in enumerate(commands): | |
runners[id] = spawn(run_process, queue, id, args, kwargs) | |
while runners: | |
id, pid, which, line = queue.get() | |
if which == 'exited' or which == 'exception': | |
runners.pop(id).join() | |
yield id, pid, which, line | |
def main(): | |
import sys | |
for data, pid, which, line in run_processes(sys.stdin.buffer, shell=True): | |
end = b'' | |
file = sys.stdout | |
if which == 'exited' or which == 'exception': | |
line = str(line).encode() | |
end = b'\n' | |
if which == 'exception': | |
file = sys.stderr | |
file.buffer.write( | |
b'%i %i %s %s%s' % (data, pid, which.encode(), line, end)) | |
if __name__ == '__main__': | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Read commands from stdin, run them in parallel, and send the output to stdout. | |
Command output looks like: | |
<command_id> <pid> (stdout|stderr) <original_line> | |
When a command exits, a single line containing the status is output: | |
<command_id> <pid> exited <status_code> | |
Example: | |
$ cat | python3 process_multiplexer_curio.py | |
echo one; sleep 2; echo two >&2 | |
echo hello world; sleep 1; echo bye; exit 1 | |
0 3504 stdout one | |
1 3509 stdout hello world | |
0 3504 stderr two | |
0 3504 exited 0 | |
1 3509 stdout bye | |
1 3509 exited 1 | |
Could be improved by showing a progress bar / summary on stderr. | |
Error handling could also use some improvement (it's almost non-existent). | |
Almost identical to process_multiplexer.py, but uses Curio tasks instead of | |
threads. | |
""" | |
from curio import run, spawn | |
from curio import subprocess | |
from curio import Queue | |
async def drain_file(queue, data, file): | |
async for line in file: | |
await queue.put((data, line)) | |
await queue.put((data, None)) | |
async def run_process(queue, data, args, kwargs): | |
kwargs.update(stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
try: | |
process = subprocess.Popen(args, **kwargs) | |
except Exception as e: | |
await queue.put((data, None, 'exception', e)) | |
return | |
pid = process.pid | |
line_queue = Queue() | |
stdout_reader = await spawn(drain_file, line_queue, 'stdout', process.stdout) | |
stderr_reader = await spawn(drain_file, line_queue, 'stderr', process.stderr) | |
stdout_done = False | |
stderr_done = False | |
while not (stdout_done and stderr_done): | |
which, line = await line_queue.get() | |
if line is None: | |
if which == 'stdout': | |
stdout_done = True | |
elif which == 'stderr': | |
stderr_done = True | |
else: | |
assert False, "unexpected which: %s" % which | |
continue | |
await queue.put((data, pid, which, line)) | |
await stdout_reader.join() | |
await stderr_reader.join() | |
await process.wait() | |
await queue.put((data, pid, 'exited', process.returncode)) | |
async def run_processes(commands, **kwargs): | |
runners = {} | |
queue = Queue() | |
for id, args in enumerate(commands): | |
runners[id] = await spawn(run_process, queue, id, args, kwargs) | |
while runners: | |
id, pid, which, line = await queue.get() | |
if which == 'exited' or which == 'exception': | |
await runners.pop(id).join() | |
yield id, pid, which, line | |
async def main(): | |
import sys | |
async for data, pid, which, line in run_processes(sys.stdin.buffer, shell=True): | |
end = b'' | |
file = sys.stdout | |
if which == 'exited' or which == 'exception': | |
line = str(line).encode() | |
end = b'\n' | |
if which == 'exception': | |
file = sys.stderr | |
file.buffer.write( | |
b'%i %i %s %s%s' % (data, pid, which.encode(), line, end)) | |
if __name__ == '__main__': | |
run(main) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Added process_multiplexer_curio.py.
In process_multiplexer.py, I renamed
run_in_thread(target, *args, **kwargs)
tospawn(target, *args)
(so it looks like spawning a task in Curio).Note how the two scripts are identical aside from
async
/await
, the imports, and howmain()
is called. I did not know Curio can be so amazing.