Last active
September 11, 2023 03:29
-
-
Save drkibitz/2e75f637121dea5c584cecca2b9b36e1 to your computer and use it in GitHub Desktop.
parallel.bash
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
## parallel.bash | |
## ------------- | |
## A bash script that executes provided tasks in parallel, with added handling | |
## for errors, logs, and signals (within reason). | |
## ```sh | |
## ./parallel.bash -m 2 \ | |
## 'echo 'sleeping1'; sleep 5' \ | |
## 'echo 'sleeping3'; sleep 2' \ | |
## 'echo 'sleeping3'; sleep 3' \ | |
## 'echo "this is a forced error" >&2; sleep 6; exit 1' \ | |
## 'echo 'sleeping3'; sleep 9' \ | |
## 'echo 'sleeping4'; sleep 4' | |
## ``` | |
set -o nounset | |
export LC_CTYPE=en_US.UTF-8 | |
usage() { | |
cat <<EOF | |
Usage: $0 [-d directory] [-m integer] [-i boolean] [task_arg1] [task_arg2] ... | |
Options: | |
-d directory Specify directory for logs (default uses mktemp) | |
-m integer Maximum that can run in parallel (default is hw.ncpu) | |
-i boolean Force disable interactive mode | |
Arguments: | |
task_argN Task arguments (optional, multiple allowed) | |
EOF | |
exit 1 | |
} | |
while getopts ":d:m:i" opt; do | |
case "${opt}" in | |
d) LOGS_DIR="${OPTARG%/}/" ;; | |
m) PARALLEL=${OPTARG} ;; | |
i) ALLOW_INTERACTIVE=false ;; | |
\?) | |
printf "Invalid option: -%s\\n" "${OPTARG}" >&2 | |
usage | |
;; | |
:) | |
printf "Option -%s requires an argument.\\n" "${OPTARG}" >&2 | |
usage | |
;; | |
esac | |
done | |
shift $((OPTIND - 1)) | |
(($# > 0)) || usage | |
# Constants | |
readonly LOGS_DIR=${LOGS_DIR:-${TMPDIR-:/tmp/}parallel.bash/$(uuidgen)/} | |
readonly FAILURE_FILE="${LOGS_DIR}failure.index" | |
declare -ir PARALLEL=${PARALLEL:-$(sysctl -n hw.ncpu)} | |
declare -r ALLOW_INTERACTIVE=${ALLOW_INTERACTIVE:-true} | |
# Mutable state | |
declare interactive=false | |
declare -i columns=128 | |
declare -i trapped_code | |
# Trap handlers | |
handle_exit() { | |
$interactive && printf '\e[?25h' # Show the cursor | |
rm -f "${FAILURE_FILE}" | |
} | |
handle_trap() { trapped_code=${trapped_code:-$1}; } | |
handle_main_err() { handle_trap 1; } | |
handle_sigint() { handle_trap 130; } | |
handle_sigterm() { handle_trap 143; } | |
trap handle_exit EXIT | |
trap handle_main_err ERR | |
trap handle_sigint INT | |
trap handle_sigterm TERM | |
print_truncated() { | |
declare str=$1 | |
declare -ir maxlen=$(($2-2)) | |
((${#str} > maxlen)) && str="${str:0:maxlen}…" | |
printf '%s' "$str" | |
} | |
main() { | |
# Immutable state | |
declare -ar tasks=("$@") spinner=(⠋ ⠙ ⠹ ⠸ ⠼ ⠴ ⠦ ⠧ ⠇ ⠏) | |
declare -ir timeout_seconds=3600 term_timeout_seconds=3 task_count=${#tasks[@]} spinner_count=${#spinner[@]} start_time=$SECONDS | |
# Mutable state | |
declare term_started=false term_expired=false | |
declare -i executing_count=0 spinner_index=0 term_start_time=0 | |
declare -a remaining_indexes=() task_pids=() task_statuses=() | |
# Initialize state | |
for ((i = 0; i < task_count; i++)); do | |
remaining_indexes[i]=$i | |
task_pids[i]=0 | |
task_statuses[i]="pending" | |
done | |
start_task() { | |
declare -ir index=$1 | |
declare -r task=${tasks[index]} | |
declare -r logs_dir="${LOGS_DIR}task-${index}/" | |
declare -r out_log="${logs_dir}/out.log" | |
declare -r err_log="${logs_dir}/err.log" | |
mkdir -p "$logs_dir" | |
printf 'parallel.bash task:%d: %s\n\n' $index "$task" | tee -a "$out_log" >>"$err_log" | |
${SHELL:-/usr/bin/env bash} -c "$task" >>"$out_log" 2>>"$err_log" || printf '%d\n' $index >>"$FAILURE_FILE" & | |
task_pids[index]=$! | |
((executing_count++)) | |
} | |
process_task() { | |
declare -ir index=$1 | |
declare -ir pid=${task_pids[index]} | |
declare -r status=${task_statuses[index]} | |
# not started | |
if ((pid == 0)); then | |
if $term_started; then | |
task_statuses[index]='cancelled' | |
return 1 | |
fi | |
((executing_count < PARALLEL)) && start_task $index | |
return 0 | |
fi | |
# started | |
if kill -0 $pid 2>/dev/null; then | |
if $term_started; then | |
if $term_expired; then # force cancel | |
task_statuses[index]='cancelled' | |
return 1 | |
elif [[ $status != cancelling ]]; then | |
task_statuses[index]='cancelling' | |
kill -TERM $pid >/dev/null 2>&1 | |
fi | |
else | |
task_statuses[index]='executing' | |
fi | |
return 0 | |
fi | |
# finished | |
if $term_started; then | |
if [[ $status == cancelling ]]; then # cancelled gracefully | |
task_statuses[index]='cancelled' | |
else | |
task_statuses[index]='failure' | |
fi | |
elif [[ $status == executing ]]; then # completed successfully | |
task_statuses[index]='success' | |
if ! $interactive; then | |
printf '\n' | |
print_task_line $index | |
fi | |
fi | |
return 1 | |
} | |
process_tasks() { | |
declare -i index | |
declare -ar remaining=("${remaining_indexes[@]}") | |
remaining_indexes=() | |
for index in "${remaining[@]}"; do | |
if process_task $index; then | |
remaining_indexes+=("$index") | |
else | |
((executing_count--)) | |
fi | |
done | |
} | |
print_task_line_prefix() { | |
declare -ir index=$1 | |
declare -ir pid=${task_pids[index]} | |
printf ' ' | |
case ${task_statuses[index]} in | |
'pending') printf '◷' ;; | |
'executing' | 'cancelling') printf '%s' "${spinner[spinner_index]}" ;; | |
'success') printf '✔' ;; | |
'failure') printf '✘' ;; | |
'cancelled') printf '□' ;; | |
esac | |
printf ' task:%d' $index | |
((pid > 0)) && printf ' pid:%d' $pid | |
} | |
print_task_line() { | |
declare -ir index=$1 | |
print_truncated "$(print_task_line_prefix $index) ─ ${tasks[index]}" $columns | |
printf '\n' | |
} | |
exit_with_summary() { | |
declare -a success=() cancelled=() | |
declare -i count index | |
print_cancelled() { | |
count=${#cancelled[@]} | |
if ((count > 0)); then | |
printf ' %sCancelled remaining %d:\n' "$1" $count | |
for index in "${cancelled[@]}"; do | |
print_task_line $index | |
done | |
printf '\n' | |
fi | |
} | |
print_failure_summary() { | |
declare -ir index=$1 | |
declare -r err_log="${LOGS_DIR}task-${index}/err.log" | |
printf ' Failed with error:\n' | |
print_task_line $index | |
printf '\n' | |
printf ' Last line of %s:\n' "$err_log" | |
sed -n '$ s/^/ └── /p' "$err_log" | |
printf '\n' | |
} | |
for ((i = 0; i < task_count; i++)); do | |
case ${task_statuses[i]} in | |
'success') success+=("$i") ;; 'cancelled') cancelled+=("$i") ;; | |
esac | |
$interactive && printf "\r\033[1A\033[2K" # move up, clear line | |
done | |
$interactive || printf '\n' | |
count=${#success[@]} | |
if ((count > 0)); then | |
if ((count < task_count)); then | |
printf ' Completed %d successfully:\n' $count | |
else | |
printf ' All tasks completed successfully:\n' | |
fi | |
for index in "${success[@]}"; do | |
print_task_line $index | |
done | |
printf '\n' | |
fi | |
if ((trapped_code > 0)); then | |
print_cancelled 'Interrupted! ' >&2 | |
exit $trapped_code | |
elif [[ -f $FAILURE_FILE ]]; then | |
print_cancelled 'Failure detected! ' >&2 | |
print_failure_summary "$(head -1 "$FAILURE_FILE")" >&2 | |
exit 1 | |
fi | |
} | |
$ALLOW_INTERACTIVE && [[ -t 1 ]] && interactive=true | |
$interactive && printf '\e[?25l' # Hide the cursor | |
mkdir -p "${LOGS_DIR}" | |
printf 'Logs:\n└── %s\n\n' "${LOGS_DIR}" | |
for ((i = 0; i < task_count; i++)); do | |
print_task_line $i | |
done | |
# Main loop | |
while ((${#remaining_indexes[@]} > 0)); do | |
if $interactive; then | |
columns=$(tput cols) | |
printf '\033[%sA' $task_count # move cursor up | |
for ((i = 0; i < task_count; i++)); do | |
print_task_line $i | |
done | |
spinner_index=$(((spinner_index + 1) % spinner_count)) | |
sleep 0.05 | |
else | |
printf '.' | |
sleep 1 | |
fi | |
if ! $term_started && { ((trapped_code > 0 || SECONDS - start_time > timeout_seconds)) || [[ -f $FAILURE_FILE ]]; }; then | |
term_started=true | |
term_start_time=$SECONDS | |
fi | |
if ! $term_expired && { $term_started && ((SECONDS - term_start_time > term_timeout_seconds)); }; then | |
term_expired=true | |
fi | |
process_tasks | |
done | |
exit_with_summary | |
} | |
main "$@" |
Author
drkibitz
commented
Jul 3, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment