Skip to content

Instantly share code, notes, and snippets.

@lemon24
Last active July 25, 2020 08:37
Show Gist options
  • Save lemon24/252ee6aa122a2b38395b096f96cb8e26 to your computer and use it in GitHub Desktop.
Save lemon24/252ee6aa122a2b38395b096f96cb8e26 to your computer and use it in GitHub Desktop.
process_multiplexer.py (from threads to Curio)
"""
Read commands from stdin, run them in parallel, and send the output to stdout.
Command output looks like:
<command_id> <pid> (stdout|stderr) <original_line>
When a command exits, a single line containing the status is output:
<command_id> <pid> exited <status_code>
Example:
$ cat | python3 process_multiplexer.py
echo one; sleep 2; echo two >&2
echo hello world; sleep 1; echo bye; exit 1
0 3504 stdout one
1 3509 stdout hello world
0 3504 stderr two
0 3504 exited 0
1 3509 stdout bye
1 3509 exited 1
Could be improved by showing a progress bar / summary on stderr.
Error handling could also use some improvement (it's almost non-existent).
Based on a similar (but better) tool written in Rust by
https://github.com/andreivasiliu
"""
import subprocess
from threading import Thread
from queue import Queue
def spawn(target, *args):
thread = Thread(target=target, args=args)
thread.start()
return thread
def drain_file(queue, data, file):
for line in file:
queue.put((data, line))
queue.put((data, None))
def run_process(queue, data, args, kwargs):
kwargs.update(stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
process = subprocess.Popen(args, **kwargs)
except Exception as e:
queue.put((data, None, 'exception', e))
return
pid = process.pid
line_queue = Queue()
stdout_reader = spawn(drain_file, line_queue, 'stdout', process.stdout)
stderr_reader = spawn(drain_file, line_queue, 'stderr', process.stderr)
stdout_done = False
stderr_done = False
while not (stdout_done and stderr_done):
which, line = line_queue.get()
if line is None:
if which == 'stdout':
stdout_done = True
elif which == 'stderr':
stderr_done = True
else:
assert False, "unexpected which: %s" % which
continue
queue.put((data, pid, which, line))
stdout_reader.join()
stderr_reader.join()
process.wait()
queue.put((data, pid, 'exited', process.returncode))
def run_processes(commands, **kwargs):
runners = {}
queue = Queue()
for id, args in enumerate(commands):
runners[id] = spawn(run_process, queue, id, args, kwargs)
while runners:
id, pid, which, line = queue.get()
if which == 'exited' or which == 'exception':
runners.pop(id).join()
yield id, pid, which, line
def main():
import sys
for data, pid, which, line in run_processes(sys.stdin.buffer, shell=True):
end = b''
file = sys.stdout
if which == 'exited' or which == 'exception':
line = str(line).encode()
end = b'\n'
if which == 'exception':
file = sys.stderr
file.buffer.write(
b'%i %i %s %s%s' % (data, pid, which.encode(), line, end))
if __name__ == '__main__':
main()
"""
Read commands from stdin, run them in parallel, and send the output to stdout.
Command output looks like:
<command_id> <pid> (stdout|stderr) <original_line>
When a command exits, a single line containing the status is output:
<command_id> <pid> exited <status_code>
Example:
$ cat | python3 process_multiplexer_curio.py
echo one; sleep 2; echo two >&2
echo hello world; sleep 1; echo bye; exit 1
0 3504 stdout one
1 3509 stdout hello world
0 3504 stderr two
0 3504 exited 0
1 3509 stdout bye
1 3509 exited 1
Could be improved by showing a progress bar / summary on stderr.
Error handling could also use some improvement (it's almost non-existent).
Almost identical to process_multiplexer.py, but uses Curio tasks instead of
threads.
"""
from curio import run, spawn
from curio import subprocess
from curio import Queue
async def drain_file(queue, data, file):
async for line in file:
await queue.put((data, line))
await queue.put((data, None))
async def run_process(queue, data, args, kwargs):
kwargs.update(stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
process = subprocess.Popen(args, **kwargs)
except Exception as e:
await queue.put((data, None, 'exception', e))
return
pid = process.pid
line_queue = Queue()
stdout_reader = await spawn(drain_file, line_queue, 'stdout', process.stdout)
stderr_reader = await spawn(drain_file, line_queue, 'stderr', process.stderr)
stdout_done = False
stderr_done = False
while not (stdout_done and stderr_done):
which, line = await line_queue.get()
if line is None:
if which == 'stdout':
stdout_done = True
elif which == 'stderr':
stderr_done = True
else:
assert False, "unexpected which: %s" % which
continue
await queue.put((data, pid, which, line))
await stdout_reader.join()
await stderr_reader.join()
await process.wait()
await queue.put((data, pid, 'exited', process.returncode))
async def run_processes(commands, **kwargs):
runners = {}
queue = Queue()
for id, args in enumerate(commands):
runners[id] = await spawn(run_process, queue, id, args, kwargs)
while runners:
id, pid, which, line = await queue.get()
if which == 'exited' or which == 'exception':
await runners.pop(id).join()
yield id, pid, which, line
async def main():
import sys
async for data, pid, which, line in run_processes(sys.stdin.buffer, shell=True):
end = b''
file = sys.stdout
if which == 'exited' or which == 'exception':
line = str(line).encode()
end = b'\n'
if which == 'exception':
file = sys.stderr
file.buffer.write(
b'%i %i %s %s%s' % (data, pid, which.encode(), line, end))
if __name__ == '__main__':
run(main)
@lemon24
Copy link
Author

lemon24 commented Mar 10, 2019

andreivasiliu's reply to my "It might be slow and errory, but I sure do like my 100 lines of Python" (the original Rust tool is ~1000 lines): https://gist.github.com/andreivasiliu/a4a2a6007d1659799b1f8057e264a61d; reads pretty much the same.

And one with futures: https://gist.github.com/andreivasiliu/48c24633dfca466d16f78e0b430a9bf7; I kinda understand it, but doesn't read nearly as well (I can't write Rust, but that's only half the issue).

@lemon24
Copy link
Author

lemon24 commented Mar 10, 2019

Added process_multiplexer_curio.py.

In process_multiplexer.py, I renamed run_in_thread(target, *args, **kwargs) to spawn(target, *args) (so it looks like spawning a task in Curio).

Note how the two scripts are identical aside from async/await, the imports, and how main() is called. I did not know Curio can be so amazing.

@lemon24
Copy link
Author

lemon24 commented Jul 25, 2020

Just for completeness, here's a Reddit comment about this script and the original tool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment