Last active
March 24, 2024 11:31
-
-
Save iAmGroute/5dc1d769cbd6e74ccb69b1e567bb1a6c to your computer and use it in GitHub Desktop.
scatter: easily parallelize shell commands and pipelines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
''' | |
## What is this? | |
A simple way of parallelizing commands (including pipelines) in a shell, while preserving output order. | |
For example, we can re-write: | |
``` | |
for i in {1..5}; do | |
echo "example_host_${i}" | |
ssh "example_host_${i}" ps -e | grep something | |
done | |
``` | |
as: | |
``` | |
for i in {1..5}; do | |
scatter echo "example_host_${i}" | |
scatter ssh "example_host_${i}" ps -e \| grep something | |
done | |
gather | |
``` | |
One-liners: | |
``` | |
for i in {1..5}; do; echo "example_host_${i}" ; ssh "example_host_${i}" ps -e | grep something; done; gather | |
for i in {1..5}; do; scatter echo "example_host_${i}" \; ssh "example_host_${i}" ps -e \| grep something; done; gather | |
``` | |
The commands will all run in parallel, so it will be significantly faster. | |
The output is exactly the same as the sequential run (order is preserved). | |
## Why? | |
Even small tasks can take a long to complete if there are lots of them (especially network/IO bound). | |
So it makes sense to run them in parallel. | |
The simplest way would be to run them in the background (`&`) and then `wait`, eg: `for ...; do x &; done; wait`. | |
For some cases this is OK, but sometimes we want to get the output *in-order*. | |
We can do this cleanly, with this `scatter`/`gather` method: `for ...; do scatter x; done; gather`. | |
## What does it do? | |
1. Each `scatter` command gets a serial ID (read+increment `/tmp/scatter_$USER/scatter_id`) | |
2. Then it starts a `scatter_bg` process in the background | |
3. The background process runs the given arguments in a shell (properly escaped), capturing stdout/err in `/tmp/scatter_$USER/{id}.oe` | |
4. When the (nested/background) shell exits, it waits for the previous `scatter_bg` process to complete | |
5. Once that happens, it writes the output (captured in the file) to the original stdout (eg: terminal) | |
6. The `gather` command waits for the last one to complete (this is optional, similar to the `wait` command) | |
## How to setup? | |
Make sure you have Python 3.6+ installed: `python3 --version` | |
Run: `python3 <path_to_this_script>/scatter.py --setup` | |
This will make this script executable and create the following symlinks: | |
``` | |
$HOME/.local/bin/gather -> <path_to_this_script>/scatter.py | |
$HOME/.local/bin/scatter -> <path_to_this_script>/scatter.py | |
$HOME/.local/bin/scatter_bg -> <path_to_this_script>/scatter.py | |
``` | |
''' | |
import os | |
import sys | |
import shlex | |
import subprocess | |
from pathlib import Path | |
BLOCK_SIZE = 1 * 1024 * 1024 | |
TMP_DIR = Path('/tmp/scatter_' + os.getenv('USER')) | |
TMP_DIR.mkdir(mode=0o700, exist_ok=True) | |
def init_id_file(p :Path): | |
if not p.exists(): | |
with open(p, 'w') as f: | |
f.write('1\n') | |
return p | |
PATH_SCATTER_ID = init_id_file(TMP_DIR / 'scatter_id') | |
def is_special_str(cmd :str): | |
''' | |
Returns `True` if the string `cmd` is special when interpreted by BASH. | |
This `cmd` is assumed to have been intentionally escaped (eg: `\;`), | |
so that it's not processed by the original shell, but the one we spawn in the background. | |
Examples include: | |
- `>a_file`, `<example.txt` | |
- `;`, `|`, `|&`, `&>` | |
- `2>&1` | |
''' | |
return ( | |
cmd[:1] in '><|&;' or | |
cmd.rpartition('>')[0].isdecimal() or | |
cmd.rpartition('<')[0].isdecimal() | |
) | |
def get_id(increment :bool): | |
with open(PATH_SCATTER_ID, 'r+') as f: | |
i = int(f.read()) | |
if increment: | |
f.seek(0, os.SEEK_SET) | |
f.write(f'{i + 1}\n') | |
# f.truncate() # not needed since it always increases | |
return i | |
def wait_id(i :int): | |
p_k = TMP_DIR / f'{i}.k' | |
try: | |
with open(p_k, 'rb'): | |
pass | |
except: | |
pass | |
def fg_phase(): | |
my_id = get_id(True) | |
p_k = TMP_DIR / f'{my_id}.k' | |
os.mkfifo(p_k) | |
try: | |
subprocess.Popen( | |
args = [ | |
Path(sys.argv[0]).with_name('scatter_bg').as_posix(), | |
str(my_id), | |
] + sys.argv[1:] | |
) | |
except: | |
p_k.unlink() | |
raise | |
def bg_phase(): | |
my_id = int(sys.argv[1]) | |
_cmd = sys.argv[2:] | |
p_oe = TMP_DIR / f'{my_id}.oe' | |
p_k = TMP_DIR / f'{my_id}.k' | |
try: | |
with open(p_oe, 'x+b') as fo: | |
shcmd = ' '.join([ | |
shlex.quote(c) if not is_special_str(c) else c | |
for c in _cmd | |
]) | |
subprocess.run( | |
args = ['bash', '-c', shcmd], | |
stdin = subprocess.DEVNULL, | |
stdout = fo, | |
stderr = fo, | |
) | |
# | |
wait_id(my_id - 1) | |
# | |
fo.seek(0, os.SEEK_SET) | |
fd = fo.fileno() | |
try: | |
while os.sendfile(1, fd, None, count=BLOCK_SIZE): | |
pass | |
except OSError: | |
while True: | |
d = os.read(fd, BLOCK_SIZE) | |
if not d: break | |
assert os.write(1, d) == len(d) | |
# | |
finally: | |
p_oe.unlink(missing_ok=True) | |
sys.stdout.buffer.flush() | |
os.close(1) | |
os.close(2) # NOTE: we will miss any traceback from now on | |
# | |
p_t = p_k.with_suffix('.t') | |
try: | |
p_k.rename(p_t) | |
fd = os.open(p_t, os.O_WRONLY | os.O_NONBLOCK) | |
os.close(fd) | |
except: | |
pass | |
finally: | |
p_k.unlink(missing_ok=True) | |
p_t.unlink(missing_ok=True) | |
def gather_phase(): | |
i = get_id(False) | |
wait_id(i - 1) | |
def main(): | |
cmd_name = Path(sys.argv[0]).name | |
if cmd_name == 'scatter': fg_phase() | |
elif cmd_name == 'scatter_bg': bg_phase() | |
elif cmd_name == 'gather': gather_phase() | |
elif cmd_name == 'scatter.py': | |
bin_dir = Path.home() / '.local' / 'bin' | |
if sys.argv[1:] == ['--setup']: | |
this_file = Path(__file__) | |
old_mode = this_file.stat().st_mode & 0o777 | |
new_mode = old_mode | ((old_mode & 0o444) >> 2) # enable 'exec' flags based on 'read' flags | |
this_file.chmod(new_mode) | |
print(f'Did: chmod {new_mode:o} {this_file}') | |
for name in [ | |
'scatter', | |
'scatter_bg', | |
'gather', | |
]: | |
p = bin_dir / name | |
p.unlink(missing_ok=True) | |
p.symlink_to(this_file) | |
print(f'Did: ln -sf {this_file} {p}') | |
else: | |
print('\n'.join([ | |
'Only `scatter.py --setup` is currently supported.', | |
' After setup, use the symlinks:', | |
' `scatter`', | |
' `gather`', | |
' located in: ' + repr(bin_dir.as_posix()), | |
' which should already by in $PATH', | |
])) | |
else: | |
raise NameError(f'Unknown command name \'{cmd_name}\', please don\'t modify symlinks.') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A great and more versatile alternative to this is GNU
parallel
.(use the
--keep-order
argument to keep the output in-order)