Skip to content

Instantly share code, notes, and snippets.

@anapsix
Last active January 30, 2017 18:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anapsix/f94379f9f54c20baabddc5011f801023 to your computer and use it in GitHub Desktop.
Save anapsix/f94379f9f54c20baabddc5011f801023 to your computer and use it in GitHub Desktop.
Getting ELB logs from S3 via SQS queue at maximum warp
#!/usr/bin/env bash
#================================================================
# DESCRIPTION
# Script retrieves files from S3 via SQS at maximum warp.
#================================================================
# AUTHOR Anastas Dancha (@anapsix)
# COPYRIGHT Copyright (c) Anastas Dancha (@anapsix)
# LICENCE GNU General Public License
#================================================================
# HISTORY
# 2017/01/29 : anapsix : added disk check, dup messages check,
# staggered threads, statsD integration,
# locking
# 2017/01/25 : anapsix : fixed logging, allow custom dest
# 2017/01/24 : anapsix : initial release
#================================================================
help() {
cat <<EOH
Usage: $0 [-h] [-s] -q sqs_queue_name [-m {0..10}] [-t {0..100}]
-h display this help
-s silent
-q sqs queue name
-m max_messages to retrieve from SQS between 1 and 10, defaults to 1
-t number of threads to start, defaults to "auto"
EOH
}
OPTS=$(getopt -o shq:m:t: -n 'parse-options' -- "$@")
if [ $? != 0 ]; then echo >&2 "Failed parsing options."; help; exit 1; fi
eval set -- "$OPTS"
while true; do
case "$1" in
-s ) export SILENT=1; shift ;;
-h ) help; exit 0; shift ;;
-q ) export SQS_QUEUE_NAME="$2"; shift; shift ;;
-m ) MAX_MESSAGES="$2"; shift; shift ;;
-t ) THREADS="$2"; shift; shift ;;
-- ) shift; break ;;
* ) break ;;
esac
done
if [ ${#SQS_QUEUE_NAME[0]} -eq 0 ]; then
echo >&2 "-s sqs_queue_name is a required argument.."
help
exit 1
fi
####
export REQUIRED_BINS="aws cut jq md5sum sed bc nc"
export KEEP_LOGS_MTIME="${KEEP_LOGS_MTIME:-1}"
export MIN_DISK_MB="${MIN_DISK_MB:-1024}"
export MIN_DISK_INODE="${MIN_DISK_INODE:-25000}"
export MAX_THREADS="${MAX_THREADS:-50}"
export STATSD_PREFIX="${STATSD_PREFIX:-app.logsmuggler.sqs}"
export STATSD_HOST="${STATSD_HOST:-localhost}"
export STATSD_PORT="${STATSD_PORT:-8125}"
export NOW=$(date +%s)
export SCRIPT_BASENAME=$(basename $0)
export SCRIPT_LOCKFILE="/tmp/${SCRIPT_BASENAME%%.*}_${SQS_QUEUE_NAME}.lock"
export SCRIPT_LOG_DIR="/tmp/${SCRIPT_BASENAME%%.*}/log"
export MESSAGE_CACHE_DIR="/tmp/${SCRIPT_BASENAME%%.*}/message_cache"
export RUN_ID=$(head -16 /dev/urandom | md5sum | cut -c1-17)
export SCRIPT_LOG="${SCRIPT_LOG_DIR}/run_${RUN_ID}.log"
export RUN_LOG_DIR="${SCRIPT_LOG_DIR}/thread_logs_${RUN_ID}"
export AWS_DEFAULT_REGION=${AWS_REGION:-us-west-2}
export ELB_LOG_DIR=${ELB_LOG_DIR:-/mnt/elb_logs}
export MAX_MESSAGES=${MAX_MESSAGES:-10}
export THREADS=${THREADS:-auto}
####
for dir in ${ELB_LOG_DIR} ${SCRIPT_LOG_DIR} ${RUN_LOG_DIR} ${MESSAGE_CACHE_DIR}; do
test -d $dir || mkdir -p $dir || ( echo 2>&1 "Failed to create $dir directory, exiting.." | tee /tmp/${RUN_ID}; exit 1 )
done
verbose() {
if [ ${SILENT:-0} -eq 0 ]; then
echo "${RUN_ID}${THREAD_ID:+/$THREAD_ID}: $@" | tee -a ${SCRIPT_LOG} >&2
fi
}
get_lock() {
if [ -e ${SCRIPT_LOCKFILE} ]; then
_lockpid=$(cut -d: -f1 ${SCRIPT_LOCKFILE})
_locktime=$(cut -d: -f2 ${SCRIPT_LOCKFILE})
if kill -0 ${_lockpid} 2>/dev/null; then
verbose "Refusing to run: detected an already running process for this queue (${SQS_QUEUE_NAME})"
verbose " process ${_lockpid} have been running for over $[$NOW-${_locktime}] seconds"
verbose >&2 " and started at $(date -d @${_locktime})"
return 1
else
verbose "lockfile detected, however PID ${_lockpid} is not running, ignoring old lockfile"
fi
elif ! ( echo "$$:${NOW}" > ${SCRIPT_LOCKFILE} ); then
verbose >&2 "could not create lockfile as ${SCRIPT_LOCKFILE}"
verbose >&2 "check lockfile permissions.."
return 1
fi
}
get_lock || exit 1
check_required() {
if ! ( which $1 >/dev/null 2>&1 ); then
verbose "${1} is not found in PATH, yet required.."
verbose "all required binaries are as follows:"
verbose "${REQUIRED_BINS// /, }"
return 1
fi
}
for bin in ${REQUIRED_BINS}; do check_required $bin || exit 1; done
check_disk() {
read _free_mb _free_inode <<<$(df -k --output=avail,iavail /mnt/elb_logs | tail -1)
if [ ${_free_mb:-0} -lt ${MIN_DISK_MB} ]; then
verbose "Available disk space if less than required: ${_free_mb}MB < ${MIN_DISK_MB}MB"
return 1
fi
if [ ${_free_inode:-0} -lt ${MIN_DISK_INODE} ]; then
verbose "Available Inode count is less than required: ${_free_inode} < ${MIN_DISK_INODE}"
return 1
fi
}
check_disk || exit 1
cleanup() {
verbose "Cleaning up logs (everything over $KEEP_LOGS_MTIME days goes) and old message IDs.."
find ${SCRIPT_LOG_DIR} -maxdepth 1 -type f -mtime +${KEEP_LOGS_MTIME:-1} -exec rm -rv {} \; >>${SCRIPT_LOG} 2>&1
find ${MESSAGE_CACHE_DIR} -maxdepth 1 -type f -mtime +1 -exec rm -v {} \; >>${SCRIPT_LOG} 2>&1
rm -v ${SCRIPT_LOCKFILE} >>${SCRIPT_LOG} 2>&1
}
stats() {
_metric="${1}"
_count="${2:-1}"
echo "${STATSD_PREFIX}.${_metric}:${_count}|c" | nc -w 1 -u ${STATSD_HOST} ${STATSD_PORT} &
disown
}
####
set -e
wait_a_bit() {
if [ -n "$1" ]; then
sleep_time=$(echo "scale=2; $[$1*10]/100" | bc -l)
else
sleep_time=${:-0.$[RANDOM%99]}
fi
sleep ${sleep_time}
}
check_message() {
if [ -e ${MESSAGE_CACHE_DIR}/${1} ]; then
stats "messages.duplicate"
return 1
fi
touch ${MESSAGE_CACHE_DIR}/${1}
}
sqs_delete_message() {
verbose "Deleting retrieved message id ${message_id}"
aws sqs delete-message --queue-url ${SQS_QUEUE_URL} --receipt-handle ${1} >>${THREAD_LOG} 2>&1 && \
stats "messages.deleted"
}
s3_get_log() {
aws s3 cp s3://$1 $2 >>${THREAD_LOG} 2>&1 && \
stats "logs.received"
}
main() {
wait_a_bit $1
export THREAD_ID=$(head -16 /dev/urandom | md5sum | cut -c1-9)
export TMP_DIR="${ELB_LOG_DIR}_tmp/${THREAD_ID}"
export THREAD_LOG="${RUN_LOG_DIR}/thread_${THREAD_ID}.log"
mkdir ${TMP_DIR}
messages="$(aws sqs receive-message --max-number-of-messages ${MAX_MESSAGES} --queue-url ${SQS_QUEUE_URL} --output text 2>>${THREAD_LOG})"
IFS=$'\n'
verbose "Downloading ELB logs to temp dir (${TMP_DIR}/).."
for message in $messages; do
message_id=$(echo "$message" | cut -f4)
receipt_handle=$(echo "$message" | cut -f5)
bucket_name=$(echo $message | cut -f2 | jq '.Records[].s3.bucket.name' | sed 's/^"//g;s/"$//g')
logfile=$(echo $message | cut -f2 | jq '.Records[].s3.object.key' | sed 's/^"//g;s/"$//g')
verbose "Processing message ${message_id}.."
stats "messages.received"
if ! ( check_message ${message_id} ); then
verbose "Got already processed message ${message_id}, skipping download.."
continue
fi
s3_get_log ${bucket_name}/${logfile} ${TMP_DIR}
sqs_delete_message ${receipt_handle}
done
verbose "Moving ELB logs from temp dir to Splunk input dir (${ELB_LOG_DIR}/).."
find ${TMP_DIR} -type f -exec mv -v {} ${ELB_LOG_DIR}/ \; >>${THREAD_LOG} 2>&1
rm -rv ${TMP_DIR} >>${THREAD_LOG} 2>&1
}
verbose "Run ${RUN_ID} started.."
verbose "Discovering SQS queue url for ${SQS_QUEUE_NAME}"
export SQS_QUEUE_URL="$(aws sqs get-queue-url --queue-name ${SQS_QUEUE_NAME} --output text)"
export SQS_NUMBER_OF_MESSAGES=$(aws sqs get-queue-attributes --queue-url ${SQS_QUEUE_URL} --attribute-names ApproximateNumberOfMessages --output text | cut -f2)
verbose "${SQS_NUMBER_OF_MESSAGES} message(s) detected.."
stats "messages.queued" ${SQS_NUMBER_OF_MESSAGES}
export THREADS_RECOMMENDED=$[$[$SQS_NUMBER_OF_MESSAGES/10]+1]
if [ ${SQS_NUMBER_OF_MESSAGES} -eq 0 ]; then
verbose "Nothing to do here, exiting.."
cleanup
exit 0
elif [ ${THREADS} -gt ${THREADS_RECOMMENDED} ]; then
verbose "Though $THREADS threads are requested, gonna run only ${THREADS_RECOMMENDED}.."
export THREADS=${THREADS_RECOMMENDED}
fi
if [ "${THREADS}" == "auto" ]; then
verbose "THREADS auto-discovery mode selected.."
if [ ${MAX_MESSAGES} -ne 10 ]; then
verbose "Ignoring passed max_messages, defaulting to maximum of 10"
export MAX_MESSAGES=10
fi
verbose "Need ${THREADS_RECOMMENDED} thread(s) to retrieve all detected messages.."
if [ ${THREADS_RECOMMENDED} -gt ${MAX_THREADS} ]; then
verbose "Cowardly refusing to run more than ${MAX_THREADS} threads.."
export THREADS=${MAX_THREADS}
else
export THREADS=${THREADS_RECOMMENDED}
fi
fi
verbose "Run is proceeding with ${THREADS} thread(s).."
for ((i=1;i<=${THREADS};i++)); do
main $i &
done
wait
cleanup
verbose "Run ${RUN_ID} finished.."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment