Last active
January 30, 2017 18:45
-
-
Save anapsix/f94379f9f54c20baabddc5011f801023 to your computer and use it in GitHub Desktop.
Getting ELB logs from S3 via SQS queue at maximum warp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
#================================================================ | |
# DESCRIPTION | |
# Script retrieves files from S3 via SQS at maximum warp. | |
#================================================================ | |
# AUTHOR Anastas Dancha (@anapsix) | |
# COPYRIGHT Copyright (c) Anastas Dancha (@anapsix) | |
# LICENCE GNU General Public License | |
#================================================================ | |
# HISTORY | |
# 2017/01/29 : anapsix : added disk check, dup messages check, | |
# staggered threads, statsD integration, | |
# locking | |
# 2017/01/25 : anapsix : fixed logging, allow custom dest | |
# 2017/01/24 : anapsix : initial release | |
#================================================================ | |
help() { | |
cat <<EOH | |
Usage: $0 [-h] [-s] -q sqs_queue_name [-m {0..10}] [-t {0..100}] | |
-h display this help | |
-s silent | |
-q sqs queue name | |
-m max_messages to retrieve from SQS between 1 and 10, defaults to 1 | |
-t number of threads to start, defaults to "auto" | |
EOH | |
} | |
OPTS=$(getopt -o shq:m:t: -n 'parse-options' -- "$@") | |
if [ $? != 0 ]; then echo >&2 "Failed parsing options."; help; exit 1; fi | |
eval set -- "$OPTS" | |
while true; do | |
case "$1" in | |
-s ) export SILENT=1; shift ;; | |
-h ) help; exit 0; shift ;; | |
-q ) export SQS_QUEUE_NAME="$2"; shift; shift ;; | |
-m ) MAX_MESSAGES="$2"; shift; shift ;; | |
-t ) THREADS="$2"; shift; shift ;; | |
-- ) shift; break ;; | |
* ) break ;; | |
esac | |
done | |
if [ ${#SQS_QUEUE_NAME[0]} -eq 0 ]; then | |
echo >&2 "-s sqs_queue_name is a required argument.." | |
help | |
exit 1 | |
fi | |
#### | |
export REQUIRED_BINS="aws cut jq md5sum sed bc nc" | |
export KEEP_LOGS_MTIME="${KEEP_LOGS_MTIME:-1}" | |
export MIN_DISK_MB="${MIN_DISK_MB:-1024}" | |
export MIN_DISK_INODE="${MIN_DISK_INODE:-25000}" | |
export MAX_THREADS="${MAX_THREADS:-50}" | |
export STATSD_PREFIX="${STATSD_PREFIX:-app.logsmuggler.sqs}" | |
export STATSD_HOST="${STATSD_HOST:-localhost}" | |
export STATSD_PORT="${STATSD_PORT:-8125}" | |
export NOW=$(date +%s) | |
export SCRIPT_BASENAME=$(basename $0) | |
export SCRIPT_LOCKFILE="/tmp/${SCRIPT_BASENAME%%.*}_${SQS_QUEUE_NAME}.lock" | |
export SCRIPT_LOG_DIR="/tmp/${SCRIPT_BASENAME%%.*}/log" | |
export MESSAGE_CACHE_DIR="/tmp/${SCRIPT_BASENAME%%.*}/message_cache" | |
export RUN_ID=$(head -16 /dev/urandom | md5sum | cut -c1-17) | |
export SCRIPT_LOG="${SCRIPT_LOG_DIR}/run_${RUN_ID}.log" | |
export RUN_LOG_DIR="${SCRIPT_LOG_DIR}/thread_logs_${RUN_ID}" | |
export AWS_DEFAULT_REGION=${AWS_REGION:-us-west-2} | |
export ELB_LOG_DIR=${ELB_LOG_DIR:-/mnt/elb_logs} | |
export MAX_MESSAGES=${MAX_MESSAGES:-10} | |
export THREADS=${THREADS:-auto} | |
#### | |
for dir in ${ELB_LOG_DIR} ${SCRIPT_LOG_DIR} ${RUN_LOG_DIR} ${MESSAGE_CACHE_DIR}; do | |
test -d $dir || mkdir -p $dir || ( echo 2>&1 "Failed to create $dir directory, exiting.." | tee /tmp/${RUN_ID}; exit 1 ) | |
done | |
verbose() { | |
if [ ${SILENT:-0} -eq 0 ]; then | |
echo "${RUN_ID}${THREAD_ID:+/$THREAD_ID}: $@" | tee -a ${SCRIPT_LOG} >&2 | |
fi | |
} | |
get_lock() { | |
if [ -e ${SCRIPT_LOCKFILE} ]; then | |
_lockpid=$(cut -d: -f1 ${SCRIPT_LOCKFILE}) | |
_locktime=$(cut -d: -f2 ${SCRIPT_LOCKFILE}) | |
if kill -0 ${_lockpid} 2>/dev/null; then | |
verbose "Refusing to run: detected an already running process for this queue (${SQS_QUEUE_NAME})" | |
verbose " process ${_lockpid} have been running for over $[$NOW-${_locktime}] seconds" | |
verbose >&2 " and started at $(date -d @${_locktime})" | |
return 1 | |
else | |
verbose "lockfile detected, however PID ${_lockpid} is not running, ignoring old lockfile" | |
fi | |
elif ! ( echo "$$:${NOW}" > ${SCRIPT_LOCKFILE} ); then | |
verbose >&2 "could not create lockfile as ${SCRIPT_LOCKFILE}" | |
verbose >&2 "check lockfile permissions.." | |
return 1 | |
fi | |
} | |
get_lock || exit 1 | |
check_required() { | |
if ! ( which $1 >/dev/null 2>&1 ); then | |
verbose "${1} is not found in PATH, yet required.." | |
verbose "all required binaries are as follows:" | |
verbose "${REQUIRED_BINS// /, }" | |
return 1 | |
fi | |
} | |
for bin in ${REQUIRED_BINS}; do check_required $bin || exit 1; done | |
check_disk() { | |
read _free_mb _free_inode <<<$(df -k --output=avail,iavail /mnt/elb_logs | tail -1) | |
if [ ${_free_mb:-0} -lt ${MIN_DISK_MB} ]; then | |
verbose "Available disk space if less than required: ${_free_mb}MB < ${MIN_DISK_MB}MB" | |
return 1 | |
fi | |
if [ ${_free_inode:-0} -lt ${MIN_DISK_INODE} ]; then | |
verbose "Available Inode count is less than required: ${_free_inode} < ${MIN_DISK_INODE}" | |
return 1 | |
fi | |
} | |
check_disk || exit 1 | |
cleanup() { | |
verbose "Cleaning up logs (everything over $KEEP_LOGS_MTIME days goes) and old message IDs.." | |
find ${SCRIPT_LOG_DIR} -maxdepth 1 -type f -mtime +${KEEP_LOGS_MTIME:-1} -exec rm -rv {} \; >>${SCRIPT_LOG} 2>&1 | |
find ${MESSAGE_CACHE_DIR} -maxdepth 1 -type f -mtime +1 -exec rm -v {} \; >>${SCRIPT_LOG} 2>&1 | |
rm -v ${SCRIPT_LOCKFILE} >>${SCRIPT_LOG} 2>&1 | |
} | |
stats() { | |
_metric="${1}" | |
_count="${2:-1}" | |
echo "${STATSD_PREFIX}.${_metric}:${_count}|c" | nc -w 1 -u ${STATSD_HOST} ${STATSD_PORT} & | |
disown | |
} | |
#### | |
set -e | |
wait_a_bit() { | |
if [ -n "$1" ]; then | |
sleep_time=$(echo "scale=2; $[$1*10]/100" | bc -l) | |
else | |
sleep_time=${:-0.$[RANDOM%99]} | |
fi | |
sleep ${sleep_time} | |
} | |
check_message() { | |
if [ -e ${MESSAGE_CACHE_DIR}/${1} ]; then | |
stats "messages.duplicate" | |
return 1 | |
fi | |
touch ${MESSAGE_CACHE_DIR}/${1} | |
} | |
sqs_delete_message() { | |
verbose "Deleting retrieved message id ${message_id}" | |
aws sqs delete-message --queue-url ${SQS_QUEUE_URL} --receipt-handle ${1} >>${THREAD_LOG} 2>&1 && \ | |
stats "messages.deleted" | |
} | |
s3_get_log() { | |
aws s3 cp s3://$1 $2 >>${THREAD_LOG} 2>&1 && \ | |
stats "logs.received" | |
} | |
main() { | |
wait_a_bit $1 | |
export THREAD_ID=$(head -16 /dev/urandom | md5sum | cut -c1-9) | |
export TMP_DIR="${ELB_LOG_DIR}_tmp/${THREAD_ID}" | |
export THREAD_LOG="${RUN_LOG_DIR}/thread_${THREAD_ID}.log" | |
mkdir ${TMP_DIR} | |
messages="$(aws sqs receive-message --max-number-of-messages ${MAX_MESSAGES} --queue-url ${SQS_QUEUE_URL} --output text 2>>${THREAD_LOG})" | |
IFS=$'\n' | |
verbose "Downloading ELB logs to temp dir (${TMP_DIR}/).." | |
for message in $messages; do | |
message_id=$(echo "$message" | cut -f4) | |
receipt_handle=$(echo "$message" | cut -f5) | |
bucket_name=$(echo $message | cut -f2 | jq '.Records[].s3.bucket.name' | sed 's/^"//g;s/"$//g') | |
logfile=$(echo $message | cut -f2 | jq '.Records[].s3.object.key' | sed 's/^"//g;s/"$//g') | |
verbose "Processing message ${message_id}.." | |
stats "messages.received" | |
if ! ( check_message ${message_id} ); then | |
verbose "Got already processed message ${message_id}, skipping download.." | |
continue | |
fi | |
s3_get_log ${bucket_name}/${logfile} ${TMP_DIR} | |
sqs_delete_message ${receipt_handle} | |
done | |
verbose "Moving ELB logs from temp dir to Splunk input dir (${ELB_LOG_DIR}/).." | |
find ${TMP_DIR} -type f -exec mv -v {} ${ELB_LOG_DIR}/ \; >>${THREAD_LOG} 2>&1 | |
rm -rv ${TMP_DIR} >>${THREAD_LOG} 2>&1 | |
} | |
verbose "Run ${RUN_ID} started.." | |
verbose "Discovering SQS queue url for ${SQS_QUEUE_NAME}" | |
export SQS_QUEUE_URL="$(aws sqs get-queue-url --queue-name ${SQS_QUEUE_NAME} --output text)" | |
export SQS_NUMBER_OF_MESSAGES=$(aws sqs get-queue-attributes --queue-url ${SQS_QUEUE_URL} --attribute-names ApproximateNumberOfMessages --output text | cut -f2) | |
verbose "${SQS_NUMBER_OF_MESSAGES} message(s) detected.." | |
stats "messages.queued" ${SQS_NUMBER_OF_MESSAGES} | |
export THREADS_RECOMMENDED=$[$[$SQS_NUMBER_OF_MESSAGES/10]+1] | |
if [ ${SQS_NUMBER_OF_MESSAGES} -eq 0 ]; then | |
verbose "Nothing to do here, exiting.." | |
cleanup | |
exit 0 | |
elif [ ${THREADS} -gt ${THREADS_RECOMMENDED} ]; then | |
verbose "Though $THREADS threads are requested, gonna run only ${THREADS_RECOMMENDED}.." | |
export THREADS=${THREADS_RECOMMENDED} | |
fi | |
if [ "${THREADS}" == "auto" ]; then | |
verbose "THREADS auto-discovery mode selected.." | |
if [ ${MAX_MESSAGES} -ne 10 ]; then | |
verbose "Ignoring passed max_messages, defaulting to maximum of 10" | |
export MAX_MESSAGES=10 | |
fi | |
verbose "Need ${THREADS_RECOMMENDED} thread(s) to retrieve all detected messages.." | |
if [ ${THREADS_RECOMMENDED} -gt ${MAX_THREADS} ]; then | |
verbose "Cowardly refusing to run more than ${MAX_THREADS} threads.." | |
export THREADS=${MAX_THREADS} | |
else | |
export THREADS=${THREADS_RECOMMENDED} | |
fi | |
fi | |
verbose "Run is proceeding with ${THREADS} thread(s).." | |
for ((i=1;i<=${THREADS};i++)); do | |
main $i & | |
done | |
wait | |
cleanup | |
verbose "Run ${RUN_ID} finished.." | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment