Skip to content

Instantly share code, notes, and snippets.

@huafengw
Created September 2, 2016 01:56
Show Gist options
  • Save huafengw/9451a668f09637fc014df584ef4e25b8 to your computer and use it in GitHub Desktop.
Save huafengw/9451a668f09637fc014df584ef4e25b8 to your computer and use it in GitHub Desktop.
yahoo streaming benchmark distributed shell
#!/bin/bash
# Copyright 2015, Yahoo Inc.
# Licensed under the terms of the Apache License 2.0. Please see LICENSE file in the project root for terms.
alias ssh="ssh -i ~/.ssh/id_rsa";
alias scp="scp -i ~/.ssh/id_rsa";
set -o errtrace
kafkaCluster="node13-1 node13-2 node13-3 node13-4"
zookeeperCluster="node13-1 node13-2 node13-3"
nimbusNode="node13-1"
stormCluster="node13-1 node13-2 node13-3 node13-4"
loadCluster="node13-1 node13-2 node13-3 node13-4"
round=4
LEIN=${LEIN:-lein}
KAFKA_VERSION=${KAFKA_VERSION:-"0.9.0.0"}
REDIS_VERSION=${REDIS_VERSION:-"3.0.5"}
SCALA_BIN_VERSION=${SCALA_BIN_VERSION:-"2.10"}
SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"4"}
STORM_VERSION=${STORM_VERSION:-"0.10.0"}
FLINK_VERSION=${FLINK_VERSION:-"0.10.1"}
GEARPUMP_VERSION=${GEARPUMP_VERSION:-"0.7.5"}
BENCHMARK_DIR="/root/huafengw/streaming-benchmarks"
STORM_DIR="/root/apache-storm-$STORM_VERSION"
REDIS_DIR="/root/redis-$REDIS_VERSION"
KAFKA_DIR="/root/kafka_$SCALA_BIN_VERSION-$KAFKA_VERSION"
FLINK_DIR="/root/flink-$FLINK_VERSION"
GEARPUMP_DIR="/root/gearpump-2.11-0.7.6-SNAPSHOT"
ZOOKEEPER_DIR="/root/zookeeper-3.4.6"
ZK_CONNECTIONS="node13-1:2181,node13-2:2181,node13-3:2181"
TOPIC=${TOPIC:-"ad-events"}
PARTITIONS=${PARTITIONS:-4}
LOAD=${LOAD:-17000}
CONF_FILE=./conf/localConf.yaml
TEST_TIME=${TEST_TIME:-1800}
pid_match() {
local VAL=`ps -aef | grep "$1" | grep -v grep | awk '{print $2}'`
echo $VAL
}
start_if_needed() {
local match="$1"
shift
local name="$1"
shift
local sleep_time="$1"
shift
local PID=`pid_match "$match"`
if [[ "$PID" -ne "" ]];
then
echo "$name is already running..."
else
"$@" &
sleep $sleep_time
fi
}
stop_if_needed() {
local match="$1"
local name="$2"
local PID=`pid_match "$match"`
if [[ "$PID" -ne "" ]];
then
kill "$PID"
sleep 1
local CHECK_AGAIN=`pid_match "$match"`
if [[ "$CHECK_AGAIN" -ne "" ]];
then
kill -9 "$CHECK_AGAIN"
fi
else
echo "No $name instance found to stop"
fi
}
fetch_untar_file() {
local FILE="download-cache/$1"
local URL=$2
if [[ -e "$FILE" ]];
then
echo "Using cached File $FILE"
else
mkdir -p download-cache/
wget -O "$FILE" "$URL"
fi
tar -xzvf "$FILE"
}
create_kafka_topic() {
local count=`$KAFKA_DIR/bin/kafka-topics.sh --describe --zookeeper "$ZK_CONNECTIONS" --topic $TOPIC 2>/dev/null | grep -c $TOPIC`
if [[ "$count" = "0" ]];
then
$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper "$ZK_CONNECTIONS" --replication-factor 1 --partitions $PARTITIONS --topic $TOPIC
else
echo "Kafka topic $TOPIC already exists"
fi
}
run() {
OPERATION=$1
if [ "START_ZK" = "$OPERATION" ];
then
for node in $zookeeperCluster; do
ssh root@$node "cd $ZOOKEEPER_DIR; nohup sh bin/zkServer.sh start"
done
elif [ "STOP_ZK" = "$OPERATION" ];
then
for node in $zookeeperCluster; do
ssh root@$node "jps -Vv | grep -i 'zookeeper' | awk '{print \$1}' | xargs kill"
ssh root@$node "find /tmp/zookeeper/* ! -name 'myid' -delete"
done
elif [ "START_REDIS" = "$OPERATION" ];
then
start_if_needed redis-server Redis 1 "$REDIS_DIR/src/redis-server"
cd data
$LEIN run -n --configPath ../conf/benchmarkConf.yaml
cd ..
elif [ "STOP_REDIS" = "$OPERATION" ];
then
stop_if_needed redis-server Redis
rm -f dump.rdb
elif [ "START_STORM" = "$OPERATION" ];
then
for node in $stormCluster; do
ssh root@$node "cd $STORM_DIR;rm -rf logs nohup*"
done
ssh root@$nimbusNode "cd $STORM_DIR;nohup bin/storm nimbus >/dev/null 2>&1 &"
ssh root@$nimbusNode "cd $STORM_DIR;nohup bin/storm ui >/dev/null 2>&1 &"
sleep 2
for node in $stormCluster; do
ssh root@$node "cd $STORM_DIR;nohup bin/storm supervisor > nohup &"
done
sleep 10
elif [ "STOP_STORM" = "$OPERATION" ];
then
ssh root@$nimbusNode "jps | grep 'nimbus' | awk '{print \$1}' | xargs kill -9"
ssh root@$nimbusNode "jps | grep 'core' | awk '{print \$1}' | xargs kill -9"
for node in $stormCluster; do
ssh root@$node "jps | grep 'supervisor' | awk '{print \$1}' | xargs kill -9"
done
elif [ "START_KAFKA" = "$OPERATION" ];
then
for node in $kafkaCluster; do
ssh root@$node "cd $KAFKA_DIR;rm -rf logs nohup;nohup bin/kafka-server-start.sh config/server.properties > nohup &"
done
create_kafka_topic
elif [ "STOP_KAFKA" = "$OPERATION" ];
then
for node in $kafkaCluster; do
ssh root@$node "jps -Vv | grep -i 'Kafka' | awk '{print \$1}' | xargs kill -9"
ssh root@$node "rm -rf /tmp/kafka-logs"
done
elif [ "START_FLINK" = "$OPERATION" ];
then
$FLINK_DIR/bin/start-cluster.sh
elif [ "STOP_FLINK" = "$OPERATION" ];
then
$FLINK_DIR/bin/stop-cluster.sh
elif [ "START_LOAD" = "$OPERATION" ];
then
t=0
for node in $loadCluster; do
ssh root@$node "cd $BENCHMARK_DIR/data;rm -rf nohup*"
done
while [ $t -lt $round ]; do
for node in $loadCluster; do
ssh root@$node "cd $BENCHMARK_DIR/data; nohup lein run -r -t $LOAD --configPath ../$CONF_FILE >nohup$t &"
done
let t=$t+1
done
elif [ "STOP_LOAD" = "$OPERATION" ];
then
for node in $loadCluster; do
ssh root@$node "ps -aef | grep 'leiningen.core.main' | grep -v grep | awk '{print \$2}' | xargs kill"
done
sleep 5
cd data
$LEIN run -g --configPath ../$CONF_FILE || true
cd ..
elif [ "START_STORM_TOPOLOGY" = "$OPERATION" ];
then
"$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology test-topo -conf $CONF_FILE
sleep 15
elif [ "STOP_STORM_TOPOLOGY" = "$OPERATION" ];
then
"$STORM_DIR/bin/storm" kill -w 0 test-topo || true
sleep 10
elif [ "START_GEARPUMP_APP" = "$OPERATION" ];
then
"$GEARPUMP_DIR/bin/gear" app -jar ./gearpump-benchmarks/target/gearpump-benchmarks-0.1.0.jar gearpump.benchmark.Advertising $CONF_FILE &
#"$GEARPUMP_DIR/bin/storm" -config ./conf/gear.yaml -jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology testtopo -conf $CONF_FILE &
sleep 5
elif [ "STOP_GEARPUMP_APP" = "$OPERATION" ];
then
sleep 1
elif [ "START_FLINK_PROCESSING" = "$OPERATION" ];
then
"$FLINK_DIR/bin/flink" run ./flink-benchmarks/target/flink-benchmarks-0.1.0.jar --confPath $CONF_FILE &
sleep 3
elif [ "STOP_FLINK_PROCESSING" = "$OPERATION" ];
then
FLINK_ID=`"$FLINK_DIR/bin/flink" list | grep 'Flink Streaming Job' | awk '{print $4}'; true`
if [ "$FLINK_ID" == "" ];
then
echo "Could not find streaming job to kill"
else
"$FLINK_DIR/bin/flink" cancel $FLINK_ID
sleep 3
fi
elif [ "STORM_TEST" = "$OPERATION" ];
then
run "START_ZK"
run "START_REDIS"
run "START_KAFKA"
run "START_STORM"
run "START_STORM_TOPOLOGY"
run "START_LOAD"
sleep $TEST_TIME
run "STOP_LOAD"
run "STOP_STORM_TOPOLOGY"
run "STOP_STORM"
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
elif [ "GEARPUMP_TEST" = "$OPERATION" ];
then
run "START_ZK"
run "START_REDIS"
run "START_KAFKA"
run "START_GEARPUMP_APP"
run "START_LOAD"
sleep $TEST_TIME
run "STOP_LOAD"
run "STOP_GEARPUMP_APP"
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
elif [ "FLINK_TEST" = "$OPERATION" ];
then
run "START_ZK"
run "START_REDIS"
run "START_KAFKA"
run "START_FLINK"
run "START_FLINK_PROCESSING"
run "START_LOAD"
sleep $TEST_TIME
run "STOP_LOAD"
run "STOP_FLINK_PROCESSING"
run "STOP_FLINK"
#run "STOP_KAFKA"
run "STOP_REDIS"
#run "STOP_ZK"
elif [ "STOP_ALL" = "$OPERATION" ];
then
run "STOP_LOAD"
run "STOP_GEARPUMP_APP"
run "STOP_STORM_TOPOLOGY"
run "STOP_FLINK_PROCESSING"
run "STOP_FLINK"
run "STOP_STORM"
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
else
if [ "HELP" != "$OPERATION" ];
then
echo "UNKOWN OPERATION '$OPERATION'"
echo
fi
echo "Supported Operations:"
echo "SETUP: download and setup dependencies for running a single node test"
echo "START_ZK: run a single node ZooKeeper instance on local host in the background"
echo "STOP_ZK: kill the ZooKeeper instance"
echo "START_REDIS: run a redis instance in the background"
echo "STOP_REDIS: kill the redis instance"
echo "START_KAFKA: run kafka in the background"
echo "STOP_KAFKA: kill kafka"
echo "START_LOAD: run kafka load generation"
echo "STOP_LOAD: kill kafka load generation"
echo "START_STORM: run storm daemons in the background"
echo "STOP_STORM: kill the storm daemons"
echo "START_FLINK: run flink processes"
echo "STOP_FLINK: kill flink processes"
echo "START_SPARK: run spark processes"
echo "STOP_SPARK: kill spark processes"
echo
echo "START_STORM_TOPOLOGY: run the storm test topology"
echo "STOP_STORM_TOPOLOGY: kill the storm test topology"
echo "START_FLINK_PROCESSING: run the flink test processing"
echo "STOP_FLINK_PROCESSSING: kill the flink test processing"
echo "START_SPARK_PROCESSING: run the spark test processing"
echo "STOP_SPARK_PROCESSSING: kill the spark test processing"
echo
echo "STORM_TEST: run storm test (assumes SETUP is done)"
echo "FLINK_TEST: run flink test (assumes SETUP is done)"
echo "SPARK_TEST: run spark test (assumes SETUP is done)"
echo "STOP_ALL: stop everything"
echo
echo "HELP: print out this message"
echo
exit 1
fi
}
if [ $# -lt 1 ];
then
run "HELP"
else
while [ $# -gt 0 ];
do
run "$1"
shift
done
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment