Skip to content

Instantly share code, notes, and snippets.

@deanproctor
Last active September 11, 2020 21:00
Show Gist options
  • Save deanproctor/961ae706bb1c080fa7bec885b36f7493 to your computer and use it in GitHub Desktop.
Save deanproctor/961ae706bb1c080fa7bec885b36f7493 to your computer and use it in GitHub Desktop.
StreamSets Data Collector autoscaling automation scripts

Installation:

  1. Save the autoscale script to: /opt/streamsets-datacollector/bin/
  2. Save autoscale.conf to: /etc/sdc/
  3. Save sdc.service to: /etc/systemd/system/
  4. Run: chmod 755 /opt/streamsets-datacollector/bin/autoscale
  5. Run: sudo systemctl daemon-reload

Configuration:

  1. Edit /etc/sdc/autoscale.conf to set your config
  2. If storing authentication credentials on disk: 1. chown the credential files to the sdc user 2. chmod the credential files 600
#!/bin/bash
set -e
if [ -f "$SDC_CONF/autoscale.conf" ]; then
. $SDC_CONF/autoscale.conf
else
echo "Autoscaling settings file missing $SDC_CONF/autoscale.conf"
exit 0
fi
if [[ "$AUTOSCALING_ENABLED" != "yes" ]]; then
echo "Autoscaling configuration disabled"
exit 0
fi
if [ -z "$USERNAME" ] || [ -z "$PASSWORD" ]; then
echo "Autoscaling Username or password missing in $SDC_CONF/autoscale.conf"
exit 1
fi
function get_session_token {
TMP_COOKIE=/tmp/cookie.txt
# login to security app
curl -s -S -o /dev/null -X POST -d "{\"userName\":\"${USERNAME}\", \"password\": \"${PASSWORD}\"}" ${SCH_URL}/security/public-rest/v1/authentication/login --header "Content-Type:application/json" --header "X-Requested-By:SDC" -c ${TMP_COOKIE}
sessionToken=$(cat ${TMP_COOKIE} | grep SSO | rev | grep -o '^\S*' | rev)
rm -f ${TMP_COOKIE}
}
function get_port_and_protocol {
PORT=$(grep ^https.port= ${SDC_CONF}/sdc.properties | cut -d '=' -f2)
PROTOCOL=https
if [ -z "$PORT" ] || [ "$PORT" = "-1" ]; then
PORT=$(grep ^http.port= ${SDC_CONF}/sdc.properties | cut -d '=' -f2)
PROTOCOL=http
fi
if [ -z "$PORT" ] || [ "$PORT" = "-1" ]; then
PORT=18630
fi
}
function pre_start {
get_session_token
if [ -s "$SDC_CONF/application-token.txt" ]; then
echo "Auth token already exists in ${SDC_CONF}/application-token.txt. SDC already registered. Exiting."
exit 0
fi
echo "Generating authentication token"
authToken=$(curl -s -S -X PUT ${SCH_URL}/security/rest/v1/organization/${ORG}/components \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" \
-d "{\"organization\": \"${ORG}\", \"componentType\" : \"dc\", \"numberOfComponents\" : 1, \"active\" : true}" | sed -e 's/^.*"fullAuthToken"[ ]*:[ ]*"//' -e 's/".*//')
# Persist auth token
echo "${authToken}" > "${SDC_CONF}/application-token.txt"
get_port_and_protocol
base_url="${PROTOCOL}://${BASE_URL}:${PORT}"
echo "Setting sdc.base.http.url in sdc.properties to ${base_url}"
sed -i "s|#sdc.base.http.url=.*|sdc.base.http.url=${base_url}|" ${SDC_CONF}/sdc.properties
echo "Enabling Control Hub"
sed -i "s|dpm.enabled=.*|dpm.enabled=true|" ${SDC_CONF}/dpm.properties
sed -i "s|dpm.base.url=.*|dpm.base.url=${SCH_URL}|" ${SDC_CONF}/dpm.properties
}
function post_start {
get_session_token
get_port_and_protocol
# Wait until we see the Control Hub auth redirect
until [ $(curl -s -S --output /dev/null -k ${PROTOCOL}://${BASE_URL}:${PORT} -w "%{http_code}" 2>/dev/null) -eq 302 ]
do
sleep 5
done
sleep 10
echo "Setting Control Hub ACLs on SDC"
SDC_ID=$(< ${SDC_DATA}/sdc.id)
curl -s -S -X POST "${SCH_URL}/jobrunner/rest/v1/sdc/${SDC_ID}/acl" \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" \
--data-binary "{\"resourceId\":\"${SDC_ID}\",\"organization\":\"${ORG}\",\"permissions\":[{\"subjectId\":\"${GROUP}\",\"subjectType\":\"GROUP\",\"actions\":[\"READ\",\"WRITE\",\"EXECUTE\"]},{\"subjectId\":\"${USERNAME}\",\"subjectType\":\"USER\",\"actions\":[\"READ\",\"WRITE\",\"EXECUTE\"]}]}" --output /dev/null
echo "Setting Labels on SDC"
printf -v labels '"%s"\n' "${LABELS//,/\",\"}"
curl -s -S -o /dev/null -X POST "${SCH_URL}/jobrunner/rest/v1/sdc/${SDC_ID}/updateLabels" \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" \
-d "{\"id\": \"${SDC_ID}\", \"organization\": \"${ORG}\", \"labels\": [$labels]}"
echo "Balancing jobs"
# Get all of the jobs matching the configured labels
JOBS=()
for label in ${LABELS//,/ }
do
response=$(curl -s -S "${SCH_URL}/jobrunner/rest/v1/jobs?organization=${ORG}&jobLabel=${label}" --header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" | python -c $'import sys, json\njobs = json.load(sys.stdin)\nfor job in jobs: print(job["id"])')
for job in $response
do
JOBS+=($job)
done
done
printf -v joined '\"%s\",' "${JOBS[@]}"
jobList=${joined::-1}
# Balance the jobs
curl -s -S -o /dev/null -X POST "${SCH_URL}/jobrunner/rest/v1/jobs/balanceJobs" \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" \
-d "[${jobList}]"
}
function pre_stop {
get_session_token
SDC_ID=$(< ${SDC_DATA}/sdc.id)
echo "Removing Labels on SDC"
# Get the current labels on the SDC
labels=$(curl -s -S "${SCH_URL}/jobrunner/rest/v1/sdc/${SDC_ID}/labels" --header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" | tr -d '["]' | tr -s ',' ' ')
# Remove all labels from the SDC
curl -s -S -o /dev/null -X POST "${SCH_URL}/jobrunner/rest/v1/sdc/${SDC_ID}/updateLabels" \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" \
-d "{\"id\": \"${SDC_ID}\", \"organization\": \"${ORG}\", \"labels\": []}"
echo "Syncing jobs"
# Get all of the jobs matching the labels on the SDC
JOBS=()
for label in $labels
do
response=$(curl -s -S "${SCH_URL}/jobrunner/rest/v1/jobs?organization=${ORG}&jobLabel=${label}" --header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" | python -c $'import sys, json\njobs = json.load(sys.stdin)\nfor job in jobs: print(job["id"])')
for job in $response
do
JOBS+=($job)
done
done
printf -v joined '\"%s\",' "${JOBS[@]}"
jobList=${joined::-1}
# Sync the jobs
curl -s -S -o /dev/null -X POST "${SCH_URL}/jobrunner/rest/v1/jobs/syncJobs" \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" -d "[${jobList}]"
until [ $(curl -s -S "${SCH_URL}/jobrunner/rest/v1/sdc/${SDC_ID}" --header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:$sessionToken" | python -c 'import sys, json; obj = json.load(sys.stdin); print(obj["pipelinesCount"])') -eq 0 ]
do
sleep 5
done
echo "De-registering SDC from Control Hub"
curl -s -S -o /dev/null -X POST ${SCH_URL}/security/rest/v1/organization/${ORG}/components/deactivate \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" -d "[ \"${SDC_ID}\" ]"
curl -s -S -o /dev/null -X POST ${SCH_URL}/security/rest/v1/organization/${ORG}/components/delete \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}" -d "[ \"${SDC_ID}\" ]"
curl -s -S -o /dev/null -X DELETE ${SCH_URL}/jobrunner/rest/v1/sdc/${SDC_ID} \
--header "Content-Type:application/json" --header "X-Requested-By:SDC" --header "X-SS-REST-CALL:true" --header "X-SS-User-Auth-Token:${sessionToken}"
echo "Disabling Control Hub locally"
> ${SDC_CONF}/application-token.txt
sed -i "s|dpm.enabled=.*|dpm.enabled=false|" ${SDC_CONF}/dpm.properties
}
if [[ "$1" = "pre_start" ]]; then
pre_start
elif [[ "$1" = "post_start" ]]; then
post_start
elif [[ "$1" = "pre_stop" ]]; then
pre_stop
fi
# Set to "no" to disable autoscaling
AUTOSCALING_ENABLED=yes
# URL to your Control Hub installation
SCH_URL=https://cloud.streamsets.com
# Name of your Control Hub organization
ORG=schdemo
# Control Hub username (must have Organization Administrator permissions)
USERNAME=$(< $SDC_CONF/private/schuser)
# Control Hub password
PASSWORD=$(< $SDC_CONF/private/schpassword)
# IP address or hostname for this instance
BASE_URL=$(curl -s http://169.254.169.254/latest/meta-data/local-ipv4)
# The labels to apply to this instance in Control Hub
LABELS=label1,label2
# The Control Hub group to give READ/WRITE/EXECUTE permissions on this instance
GROUP=all@schdemo
# Copyright 2017 StreamSets Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[Unit]
Description=StreamSets Data Collector (SDC)
After=network-online.target
Requires=network-online.target
[Service]
User=sdc
Group=sdc
LimitNOFILE=32768
Environment=SDC_CONF=/etc/sdc
Environment=SDC_HOME=/opt/streamsets-datacollector
Environment=SDC_LOG=/var/log/sdc
Environment=SDC_DATA=/var/lib/sdc
ExecStartPre=/opt/streamsets-datacollector/bin/autoscale pre_start
ExecStart=/opt/streamsets-datacollector/bin/streamsets dc -verbose
ExecStartPost=/opt/streamsets-datacollector/bin/autoscale post_start
ExecStop=/opt/streamsets-datacollector/bin/autoscale pre_stop
TimeoutSec=120
RemainAfterExit=yes
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment