Skip to content

Instantly share code, notes, and snippets.

@berngp
Created March 21, 2014 06:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save berngp/9680803 to your computer and use it in GitHub Desktop.
Save berngp/9680803 to your computer and use it in GitHub Desktop.
Spark Extended Shell
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Shell script for starting the Spark Shell REPL
# Note that it will set MASTER to spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
# if those two env vars are set in spark-env.sh but MASTER is not.
cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
esac
# Enter posix mode for bash
set -o posix
## Global script variables
FWDIR="$(cd `dirname $0`/..; pwd)"
VERBOSE=0
DRY_RUN=0
SPARK_REPL_OPTS="${SPARK_REPL_OPTS:-""}"
MASTER=""
#CLI Color Templates
txtund=$(tput sgr 0 1) # Underline
txtbld=$(tput bold) # Bold
bldred=${txtbld}$(tput setaf 1) # red
bldyel=${txtbld}$(tput setaf 3) # yellow
bldblu=${txtbld}$(tput setaf 4) # blue
bldwht=${txtbld}$(tput setaf 7) # white
txtrst=$(tput sgr0) # Reset
info=${bldwht}*${txtrst} # Feedback
pass=${bldblu}*${txtrst}
warn=${bldred}*${txtrst}
ques=${bldblu}?${txtrst}
# Helper function to describe the script usage
function usage() {
cat << EOF
${txtbld}Usage${txtrst}: spark-shell [OPTIONS]
${txtbld}OPTIONS${txtrst}:
${txtund}Basic${txtrst}:
-h --help : Print this help information.
-c --executor-cores : The maximum number of cores to be used by the Spark Shell.
-em --executor-memory : The memory used by each executor of the Spark Shell, the number
is followed by m for megabytes or g for gigabytes, e.g. "1g".
-dm --driver-memory : The memory used by the Spark Shell, the number is followed
by m for megabytes or g for gigabytes, e.g. "1g".
${txtund}Soon to be deprecated${txtrst}:
--cores : please use -c/--executor-cores
${txtund}Other options${txtrst}:
-mip --master-ip : The Spark Master ip/hostname.
-mp --master-port : The Spark Master port.
-m --master : A full string that describes the Spark Master, e.g. "local" or "spark://localhost:7077".
-ld --local-dir : The absolute path to a local directory that will be use for "scratch" space in Spark.
-dh --driver-host : Hostname or ip address for the driver to listen on.
-dp --driver-port : The port for the driver to listen on.
-uip --ui-port : The port for your application's dashboard, which shows memory and workload data.
--parallelism : The default number of tasks to use across the cluster for distributed shuffle operations.
--locality-wait : The number of milliseconds to wait to launch a data-local task before giving up.
--schedule-fair : Enables FAIR scheduling between jobs submitted to the same SparkContext.
--max-failures : The number of individual task failures before giving up on the job.
--log-conf : Enables logging of the supplied SparkConf as INFO at start of the Spark Context.
--mesos-coarse : Enables coarse grained execution in Apache Mesos, this will launch only one long-running
Spark task on each Mesos machine, and dynamically schedule its own “mini-tasks” within it.
e.g.
spark-shell -m spark://localhost:7077 -dm 512m -em 2g -uip 4010
or
spark-shell -m local -ld /tmp -dh 127.0.0.1 -dp 4001 -uip 4010 --parallelism 10 --locality-wait 500 --schedule-fair --max-failures 100
EOF
}
function out_error(){
echo -e "${txtund}${bldred}ERROR${txtrst}: $1"
usage
exit 1
}
function log_info(){
[ $VERBOSE -eq 1 ] && echo -e "${bldyel}INFO${txtrst}: $1"
}
function log_warn(){
echo -e "${txtund}${bldyel}WARN${txtrst}: $1"
}
# PATTERNS used to validate more than one optional arg.
ARG_FLAG_PATTERN="^-"
MEM_PATTERN="^[0-9]+[m|g|M|G]$"
NUM_PATTERN="^[0-9]+$"
PORT_PATTERN="^[0-9]+$"
# Setters for optional args.
function set_cores(){
CORE_PATTERN="^[0-9]+$"
if [[ "$1" =~ $CORE_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.cores.max=$1"
else
out_error "wrong format for $2"
fi
}
function set_em(){
if [[ $1 =~ $MEM_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.executor.memory=$1"
else
out_error "wrong format for $2"
fi
}
function set_dm(){
if [[ $1 =~ $MEM_PATTERN ]]; then
export SPARK_DRIVER_MEMORY=$1
else
out_error "wrong format for $2"
fi
}
function set_localdir(){
LOCAL_DIR_PATTERN="\/.+"
if [[ "$1" =~ $LOCAL_DIR_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.local.dir=$1"
else
out_error "wrong format for $2"
fi
}
function set_driver_host(){
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.driver.host=$1"
else
out_error "wrong format for $2"
fi
}
function set_driver_port(){
if [[ $1 =~ $PORT_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.driver.port=$1"
else
out_error "wrong format for $2"
fi
}
function set_uip(){
if [[ $1 =~ $PORT_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.ui.port=$1"
else
out_error "wrong format for $2"
fi
}
function set_parallelism(){
if [[ $1 =~ $NUM_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.default.parallelism=$1"
else
out_error "wrong format for $2"
fi
}
function set_locality_wait(){
if [[ $1 =~ $NUM_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.locality.wait=$1"
else
out_error "wrong format for $2"
fi
}
function set_spark_scheduler(){
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.scheduler.mode=$1"
}
function set_spark_max_failures(){
if [[ $1 =~ $NUM_PATTERN ]]; then
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.task.maxFailures=$1"
else
out_error "wrong format for $2"
fi
}
function set_spark_log_conf(){
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.logConf=$1"
}
function set_spark_master_ip() {
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
SPARK_MASTER_IP="$1"
else
out_error "wrong format for $2"
fi
}
function set_spark_master_port() {
if [[ $1 =~ $PORT_PATTERN ]]; then
SPARK_MASTER_PORT="$1"
else
out_error "wrong format for $2"
fi
}
function set_spark_master(){
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
MASTER="$1"
else
out_error "wrong format for $2"
fi
}
function set_spark_mesos_coarse(){
SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Dspark.mesos.coarse=$1"
}
function resolve_spark_master(){
# Set MASTER from spark-env if possible
DEFAULT_SPARK_MASTER_PORT=7077
if [ -z "$MASTER" ]; then
if [ -e "$FWDIR/conf/spark-env.sh" ]; then
. "$FWDIR/conf/spark-env.sh"
fi
if [ -n "$SPARK_MASTER_IP" ]; then
SPARK_MASTER_PORT="${SPARK_MASTER_PORT:-"$DEFAULT_SPARK_MASTER_PORT"}"
export MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
fi
fi
if [ -z "$MASTER" ]; then
out_error "Unable to define a Spark Master, please either define a $FWDIR/conf/spark-env.sh or see usage with -h"
fi
}
function main(){
log_info "Base Directory set to $FWDIR"
resolve_spark_master
log_info "Spark Master is $MASTER"
log_info "Spark REPL options $SPARK_REPL_OPTS"
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_REPL_OPTS="$SPARK_REPL_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_REPL_OPTS
$FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
fi
}
for option in "$@"
do
case $option in
-h | --help )
usage
exit 1
;;
-c | --executor-cores)
shift
_1=$1
shift
set_cores $_1 "-c/--executor-cores"
;;
--cores)
shift
_1=$1
shift
log_warn "The option --cores has been deprecated, please use -c/--executor-cores instead."
set_cores $_1 "--cores"
;;
-em | --executor-memory)
shift
_1=$1
shift
set_em $_1 "-em/--executor-memory"
;;
-dm | --driver-memory)
shift
_1=$1
shift
set_dm $_1 "-dm/--driver-memory"
;;
-ld | --local-dir)
shift
_1=$1
shift
set_localdir $_1 "-ld/--local-dir"
;;
-dh | --driver-host)
shift
_1=$1
shift
set_driver_host $_1 "-dh/--driver-host"
;;
-dp | --driver-port)
shift
_1=$1
shift
set_driver_port $_1 "-dp/--driver-port"
;;
-mip | --master-ip)
shift
_1=$1
shift
set_spark_master_ip $_1 "-mip/--master-ip"
;;
-mp | --master-port)
shift
_1=$1
shift
set_spark_master_port $_1 "-mp/--master-port"
;;
-m | --master)
shift
_1=$1
shift
set_spark_master $_1 "-m/--master"
;;
-uip | --ui-port)
shift
_1=$1
shift
set_uip $_1 "-uip/--ui-port"
;;
--parallelism)
shift
_1=$1
shift
set_parallelism $_1 "--parallelism"
;;
--locality-wait)
shift
_1=$1
shift
set_locality_wait $_1 "--locality-wait"
;;
--schedule-fair)
shift
set_spark_scheduler "FAIR"
;;
--max-failures)
shift
_1=$1
shift
set_spark_max_failures "$_1" "--max-failures"
;;
--log-conf)
shift
set_spark_log_conf "true"
;;
--mesos-coarse)
shift
set_spark_mesos_coarse "true"
;;
-v | --verbose )
shift
VERBOSE=1
;;
--dry-run)
shift
DRY_RUN=1
;;
?)
;;
esac
done
# Copy restore-TTY-on-exit functions from Scala script so spark-shell exits properly even in
# binary distribution of Spark where Scala is not installed
exit_status=127
saved_stty=""
# restore stty settings (echo in particular)
function restoreSttySettings() {
stty $saved_stty
saved_stty=""
}
function onExit() {
if [[ "$saved_stty" != "" ]]; then
restoreSttySettings
fi
exit $exit_status
}
# to reenable echo if we are interrupted before completing.
trap onExit INT
# save terminal settings
saved_stty=$(stty -g 2>/dev/null)
# clear on error so we don't later try to restore them
if [[ ! $? ]]; then
saved_stty=""
fi
main
# record the exit status lest it be overwritten:
# then reenable echo and propagate the code.
exit_status=$?
onExit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment