Skip to content

Instantly share code, notes, and snippets.

@iAmGroute
Last active March 24, 2024 11:31
Show Gist options
  • Save iAmGroute/5dc1d769cbd6e74ccb69b1e567bb1a6c to your computer and use it in GitHub Desktop.
Save iAmGroute/5dc1d769cbd6e74ccb69b1e567bb1a6c to your computer and use it in GitHub Desktop.
scatter: easily parallelize shell commands and pipelines
#!/usr/bin/python3
'''
## What is this?
A simple way of parallelizing commands (including pipelines) in a shell, while preserving output order.
For example, we can re-write:
```
for i in {1..5}; do
echo "example_host_${i}"
ssh "example_host_${i}" ps -e | grep something
done
```
as:
```
for i in {1..5}; do
scatter echo "example_host_${i}"
scatter ssh "example_host_${i}" ps -e \| grep something
done
gather
```
One-liners:
```
for i in {1..5}; do; echo "example_host_${i}" ; ssh "example_host_${i}" ps -e | grep something; done; gather
for i in {1..5}; do; scatter echo "example_host_${i}" \; ssh "example_host_${i}" ps -e \| grep something; done; gather
```
The commands will all run in parallel, so it will be significantly faster.
The output is exactly the same as the sequential run (order is preserved).
## Why?
Even small tasks can take a long to complete if there are lots of them (especially network/IO bound).
So it makes sense to run them in parallel.
The simplest way would be to run them in the background (`&`) and then `wait`, eg: `for ...; do x &; done; wait`.
For some cases this is OK, but sometimes we want to get the output *in-order*.
We can do this cleanly, with this `scatter`/`gather` method: `for ...; do scatter x; done; gather`.
## What does it do?
1. Each `scatter` command gets a serial ID (read+increment `/tmp/scatter_$USER/scatter_id`)
2. Then it starts a `scatter_bg` process in the background
3. The background process runs the given arguments in a shell (properly escaped), capturing stdout/err in `/tmp/scatter_$USER/{id}.oe`
4. When the (nested/background) shell exits, it waits for the previous `scatter_bg` process to complete
5. Once that happens, it writes the output (captured in the file) to the original stdout (eg: terminal)
6. The `gather` command waits for the last one to complete (this is optional, similar to the `wait` command)
## How to setup?
Make sure you have Python 3.6+ installed: `python3 --version`
Run: `python3 <path_to_this_script>/scatter.py --setup`
This will make this script executable and create the following symlinks:
```
$HOME/.local/bin/gather -> <path_to_this_script>/scatter.py
$HOME/.local/bin/scatter -> <path_to_this_script>/scatter.py
$HOME/.local/bin/scatter_bg -> <path_to_this_script>/scatter.py
```
'''
import os
import sys
import shlex
import subprocess
from pathlib import Path
BLOCK_SIZE = 1 * 1024 * 1024
TMP_DIR = Path('/tmp/scatter_' + os.getenv('USER'))
TMP_DIR.mkdir(mode=0o700, exist_ok=True)
def init_id_file(p :Path):
if not p.exists():
with open(p, 'w') as f:
f.write('1\n')
return p
PATH_SCATTER_ID = init_id_file(TMP_DIR / 'scatter_id')
def is_special_str(cmd :str):
'''
Returns `True` if the string `cmd` is special when interpreted by BASH.
This `cmd` is assumed to have been intentionally escaped (eg: `\;`),
so that it's not processed by the original shell, but the one we spawn in the background.
Examples include:
- `>a_file`, `<example.txt`
- `;`, `|`, `|&`, `&>`
- `2>&1`
'''
return (
cmd[:1] in '><|&;' or
cmd.rpartition('>')[0].isdecimal() or
cmd.rpartition('<')[0].isdecimal()
)
def get_id(increment :bool):
with open(PATH_SCATTER_ID, 'r+') as f:
i = int(f.read())
if increment:
f.seek(0, os.SEEK_SET)
f.write(f'{i + 1}\n')
# f.truncate() # not needed since it always increases
return i
def wait_id(i :int):
p_k = TMP_DIR / f'{i}.k'
try:
with open(p_k, 'rb'):
pass
except:
pass
def fg_phase():
my_id = get_id(True)
p_k = TMP_DIR / f'{my_id}.k'
os.mkfifo(p_k)
try:
subprocess.Popen(
args = [
Path(sys.argv[0]).with_name('scatter_bg').as_posix(),
str(my_id),
] + sys.argv[1:]
)
except:
p_k.unlink()
raise
def bg_phase():
my_id = int(sys.argv[1])
_cmd = sys.argv[2:]
p_oe = TMP_DIR / f'{my_id}.oe'
p_k = TMP_DIR / f'{my_id}.k'
try:
with open(p_oe, 'x+b') as fo:
shcmd = ' '.join([
shlex.quote(c) if not is_special_str(c) else c
for c in _cmd
])
subprocess.run(
args = ['bash', '-c', shcmd],
stdin = subprocess.DEVNULL,
stdout = fo,
stderr = fo,
)
#
wait_id(my_id - 1)
#
fo.seek(0, os.SEEK_SET)
fd = fo.fileno()
try:
while os.sendfile(1, fd, None, count=BLOCK_SIZE):
pass
except OSError:
while True:
d = os.read(fd, BLOCK_SIZE)
if not d: break
assert os.write(1, d) == len(d)
#
finally:
p_oe.unlink(missing_ok=True)
sys.stdout.buffer.flush()
os.close(1)
os.close(2) # NOTE: we will miss any traceback from now on
#
p_t = p_k.with_suffix('.t')
try:
p_k.rename(p_t)
fd = os.open(p_t, os.O_WRONLY | os.O_NONBLOCK)
os.close(fd)
except:
pass
finally:
p_k.unlink(missing_ok=True)
p_t.unlink(missing_ok=True)
def gather_phase():
i = get_id(False)
wait_id(i - 1)
def main():
cmd_name = Path(sys.argv[0]).name
if cmd_name == 'scatter': fg_phase()
elif cmd_name == 'scatter_bg': bg_phase()
elif cmd_name == 'gather': gather_phase()
elif cmd_name == 'scatter.py':
bin_dir = Path.home() / '.local' / 'bin'
if sys.argv[1:] == ['--setup']:
this_file = Path(__file__)
old_mode = this_file.stat().st_mode & 0o777
new_mode = old_mode | ((old_mode & 0o444) >> 2) # enable 'exec' flags based on 'read' flags
this_file.chmod(new_mode)
print(f'Did: chmod {new_mode:o} {this_file}')
for name in [
'scatter',
'scatter_bg',
'gather',
]:
p = bin_dir / name
p.unlink(missing_ok=True)
p.symlink_to(this_file)
print(f'Did: ln -sf {this_file} {p}')
else:
print('\n'.join([
'Only `scatter.py --setup` is currently supported.',
' After setup, use the symlinks:',
' `scatter`',
' `gather`',
' located in: ' + repr(bin_dir.as_posix()),
' which should already by in $PATH',
]))
else:
raise NameError(f'Unknown command name \'{cmd_name}\', please don\'t modify symlinks.')
if __name__ == '__main__':
main()
@iAmGroute
Copy link
Author

A great and more versatile alternative to this is GNU parallel.
(use the --keep-order argument to keep the output in-order)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment