Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
spark-submit template for running Spark Streaming on YARN (referenced in
# Minimum TODOs on a per job basis:
# 1. define name, application jar path, main class, queue and path
# 2. remove properties not applicable to your Spark version (Spark 1.x vs. Spark 2.x)
# 3. tweak num_executors, executor_memory (+ overhead), and backpressure settings
# the two most important settings:
# 3-5 cores per executor is a good default balancing HDFS client throughput vs. JVM overhead
# see
# backpressure
spark-submit --master yarn --deploy-mode cluster \
--name <my-job-name> \
--class <main-class> \
--driver-memory 2g \
--num-executors ${num_executors} --executor-cores ${executor_cores} --executor-memory ${executor_memory} \
--queue <realtime_queue> \
--files <hdfs:///path/to/> \
--conf \
--conf \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer `# Kryo Serializer is much faster than the default Java Serializer` \
--conf spark.locality.wait=10 `# Increase job parallelity by reducing Spark Delay Scheduling (potentially big performance impact (!)) (Default: 3s)` \
--conf spark.task.maxFailures=8 `# Increase max task failures before failing job (Default: 4)` \
--conf spark.ui.killEnabled=false `# Prevent killing of stages and corresponding jobs from the Spark UI` \
--conf spark.logConf=true `# Log Spark Configuration in driver log for troubleshooting` \
--conf spark.streaming.blockInterval=200 `# [Optional] Tweak to balance data processing parallelism vs. task scheduling overhead (Default: 200ms)` \
--conf spark.streaming.receiver.writeAheadLog.enable=true `# Prevent data loss on driver recovery` \
--conf spark.streaming.backpressure.enabled=true \
--conf `# [Optional] Reduce min rate of PID-based backpressure implementation (Default: 100)` \
--conf spark.streaming.receiver.maxRate=${receiver_max_rate} `# [Spark 1.x]: Workaround for missing initial rate (Default: not set)` \
--conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate} `# [Spark 1.x]: Corresponding max rate setting for Direct Kafka Streaming (Default: not set)` \
--conf spark.streaming.backpressure.initialRate=${receiver_initial_rate} `# [Spark 2.x]: Initial rate before backpressure kicks in (Default: not set)` \
--conf spark.yarn.driver.memoryOverhead=512 `# [Optional] Set if --driver-memory < 5GB` \
--conf spark.yarn.executor.memoryOverhead=1024 `# [Optional] Set if --executor-memory < 10GB` \
--conf spark.yarn.maxAppAttempts=4 `# Increase max application master attempts (needs to be <= in YARN, which defaults to 2) (Default:` \
--conf `# Attempt counter considers only the last hour (Default: (none))` \
--conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures (Default: max(numExecutors * 2, 3))` \
--conf spark.yarn.executor.failuresValidityInterval=1h `# Executor failure counter considers only the last hour` \

This comment has been minimized.

Copy link

@marcraminv marcraminv commented Jun 28, 2017

Hello @bernhardschaefer, Do you only set the memoryOverhead when executor memory is less than 10G? In spark documentation talks about 6~10% increased if executor size growth.


This comment has been minimized.

Copy link
Owner Author

@bernhardschaefer bernhardschaefer commented Jul 27, 2017

Hello @marcraminv, exactly.
I noticed that the default 10% is not sufficient for small executors. E.g. for an executor with 4GB memory the 10% default leads to 400mb overhead, which led to containers getting killed by YARN in some cases.
For everything above 10GB, at least 1GB of overhead will be allocated, so in those cases I think the default is fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment