Skip to content

Instantly share code, notes, and snippets.

@drkibitz
Last active September 11, 2023 03:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drkibitz/2e75f637121dea5c584cecca2b9b36e1 to your computer and use it in GitHub Desktop.
Save drkibitz/2e75f637121dea5c584cecca2b9b36e1 to your computer and use it in GitHub Desktop.
parallel.bash
#!/usr/bin/env bash
## parallel.bash
## -------------
## A bash script that executes provided tasks in parallel, with added handling
## for errors, logs, and signals (within reason).
## ```sh
## ./parallel.bash -m 2 \
## 'echo 'sleeping1'; sleep 5' \
## 'echo 'sleeping3'; sleep 2' \
## 'echo 'sleeping3'; sleep 3' \
## 'echo "this is a forced error" >&2; sleep 6; exit 1' \
## 'echo 'sleeping3'; sleep 9' \
## 'echo 'sleeping4'; sleep 4'
## ```
set -o nounset
export LC_CTYPE=en_US.UTF-8
usage() {
cat <<EOF
Usage: $0 [-d directory] [-m integer] [-i boolean] [task_arg1] [task_arg2] ...
Options:
-d directory Specify directory for logs (default uses mktemp)
-m integer Maximum that can run in parallel (default is hw.ncpu)
-i boolean Force disable interactive mode
Arguments:
task_argN Task arguments (optional, multiple allowed)
EOF
exit 1
}
while getopts ":d:m:i" opt; do
case "${opt}" in
d) LOGS_DIR="${OPTARG%/}/" ;;
m) PARALLEL=${OPTARG} ;;
i) ALLOW_INTERACTIVE=false ;;
\?)
printf "Invalid option: -%s\\n" "${OPTARG}" >&2
usage
;;
:)
printf "Option -%s requires an argument.\\n" "${OPTARG}" >&2
usage
;;
esac
done
shift $((OPTIND - 1))
(($# > 0)) || usage
# Constants
readonly LOGS_DIR=${LOGS_DIR:-${TMPDIR-:/tmp/}parallel.bash/$(uuidgen)/}
readonly FAILURE_FILE="${LOGS_DIR}failure.index"
declare -ir PARALLEL=${PARALLEL:-$(sysctl -n hw.ncpu)}
declare -r ALLOW_INTERACTIVE=${ALLOW_INTERACTIVE:-true}
# Mutable state
declare interactive=false
declare -i columns=128
declare -i trapped_code
# Trap handlers
handle_exit() {
$interactive && printf '\e[?25h' # Show the cursor
rm -f "${FAILURE_FILE}"
}
handle_trap() { trapped_code=${trapped_code:-$1}; }
handle_main_err() { handle_trap 1; }
handle_sigint() { handle_trap 130; }
handle_sigterm() { handle_trap 143; }
trap handle_exit EXIT
trap handle_main_err ERR
trap handle_sigint INT
trap handle_sigterm TERM
print_truncated() {
declare str=$1
declare -ir maxlen=$(($2-2))
((${#str} > maxlen)) && str="${str:0:maxlen}…"
printf '%s' "$str"
}
main() {
# Immutable state
declare -ar tasks=("$@") spinner=(⠋ ⠙ ⠹ ⠸ ⠼ ⠴ ⠦ ⠧ ⠇ ⠏)
declare -ir timeout_seconds=3600 term_timeout_seconds=3 task_count=${#tasks[@]} spinner_count=${#spinner[@]} start_time=$SECONDS
# Mutable state
declare term_started=false term_expired=false
declare -i executing_count=0 spinner_index=0 term_start_time=0
declare -a remaining_indexes=() task_pids=() task_statuses=()
# Initialize state
for ((i = 0; i < task_count; i++)); do
remaining_indexes[i]=$i
task_pids[i]=0
task_statuses[i]="pending"
done
start_task() {
declare -ir index=$1
declare -r task=${tasks[index]}
declare -r logs_dir="${LOGS_DIR}task-${index}/"
declare -r out_log="${logs_dir}/out.log"
declare -r err_log="${logs_dir}/err.log"
mkdir -p "$logs_dir"
printf 'parallel.bash task:%d: %s\n\n' $index "$task" | tee -a "$out_log" >>"$err_log"
${SHELL:-/usr/bin/env bash} -c "$task" >>"$out_log" 2>>"$err_log" || printf '%d\n' $index >>"$FAILURE_FILE" &
task_pids[index]=$!
((executing_count++))
}
process_task() {
declare -ir index=$1
declare -ir pid=${task_pids[index]}
declare -r status=${task_statuses[index]}
# not started
if ((pid == 0)); then
if $term_started; then
task_statuses[index]='cancelled'
return 1
fi
((executing_count < PARALLEL)) && start_task $index
return 0
fi
# started
if kill -0 $pid 2>/dev/null; then
if $term_started; then
if $term_expired; then # force cancel
task_statuses[index]='cancelled'
return 1
elif [[ $status != cancelling ]]; then
task_statuses[index]='cancelling'
kill -TERM $pid >/dev/null 2>&1
fi
else
task_statuses[index]='executing'
fi
return 0
fi
# finished
if $term_started; then
if [[ $status == cancelling ]]; then # cancelled gracefully
task_statuses[index]='cancelled'
else
task_statuses[index]='failure'
fi
elif [[ $status == executing ]]; then # completed successfully
task_statuses[index]='success'
if ! $interactive; then
printf '\n'
print_task_line $index
fi
fi
return 1
}
process_tasks() {
declare -i index
declare -ar remaining=("${remaining_indexes[@]}")
remaining_indexes=()
for index in "${remaining[@]}"; do
if process_task $index; then
remaining_indexes+=("$index")
else
((executing_count--))
fi
done
}
print_task_line_prefix() {
declare -ir index=$1
declare -ir pid=${task_pids[index]}
printf ' '
case ${task_statuses[index]} in
'pending') printf '◷' ;;
'executing' | 'cancelling') printf '%s' "${spinner[spinner_index]}" ;;
'success') printf '✔' ;;
'failure') printf '✘' ;;
'cancelled') printf '□' ;;
esac
printf ' task:%d' $index
((pid > 0)) && printf ' pid:%d' $pid
}
print_task_line() {
declare -ir index=$1
print_truncated "$(print_task_line_prefix $index) ─ ${tasks[index]}" $columns
printf '\n'
}
exit_with_summary() {
declare -a success=() cancelled=()
declare -i count index
print_cancelled() {
count=${#cancelled[@]}
if ((count > 0)); then
printf ' %sCancelled remaining %d:\n' "$1" $count
for index in "${cancelled[@]}"; do
print_task_line $index
done
printf '\n'
fi
}
print_failure_summary() {
declare -ir index=$1
declare -r err_log="${LOGS_DIR}task-${index}/err.log"
printf ' Failed with error:\n'
print_task_line $index
printf '\n'
printf ' Last line of %s:\n' "$err_log"
sed -n '$ s/^/ └── /p' "$err_log"
printf '\n'
}
for ((i = 0; i < task_count; i++)); do
case ${task_statuses[i]} in
'success') success+=("$i") ;; 'cancelled') cancelled+=("$i") ;;
esac
$interactive && printf "\r\033[1A\033[2K" # move up, clear line
done
$interactive || printf '\n'
count=${#success[@]}
if ((count > 0)); then
if ((count < task_count)); then
printf ' Completed %d successfully:\n' $count
else
printf ' All tasks completed successfully:\n'
fi
for index in "${success[@]}"; do
print_task_line $index
done
printf '\n'
fi
if ((trapped_code > 0)); then
print_cancelled 'Interrupted! ' >&2
exit $trapped_code
elif [[ -f $FAILURE_FILE ]]; then
print_cancelled 'Failure detected! ' >&2
print_failure_summary "$(head -1 "$FAILURE_FILE")" >&2
exit 1
fi
}
$ALLOW_INTERACTIVE && [[ -t 1 ]] && interactive=true
$interactive && printf '\e[?25l' # Hide the cursor
mkdir -p "${LOGS_DIR}"
printf 'Logs:\n└── %s\n\n' "${LOGS_DIR}"
for ((i = 0; i < task_count; i++)); do
print_task_line $i
done
# Main loop
while ((${#remaining_indexes[@]} > 0)); do
if $interactive; then
columns=$(tput cols)
printf '\033[%sA' $task_count # move cursor up
for ((i = 0; i < task_count; i++)); do
print_task_line $i
done
spinner_index=$(((spinner_index + 1) % spinner_count))
sleep 0.05
else
printf '.'
sleep 1
fi
if ! $term_started && { ((trapped_code > 0 || SECONDS - start_time > timeout_seconds)) || [[ -f $FAILURE_FILE ]]; }; then
term_started=true
term_start_time=$SECONDS
fi
if ! $term_expired && { $term_started && ((SECONDS - term_start_time > term_timeout_seconds)); }; then
term_expired=true
fi
process_tasks
done
exit_with_summary
}
main "$@"
@drkibitz
Copy link
Author

drkibitz commented Jul 3, 2023

recording

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment