Skip to content

Instantly share code, notes, and snippets.

@ChongTang
Last active February 27, 2019 00:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChongTang/2366db5d98f29d1daa187424ad74d6f3 to your computer and use it in GitHub Desktop.
Save ChongTang/2366db5d98f29d1daa187424ad74d6f3 to your computer and use it in GitHub Desktop.
Things about how to tune Spark memory related parameters.

How to tune Spark w.r.t memory issues?

If you just want to how to properly set parameter values, just go to this section.

Some notes about cache:

Spark provides its own native caching mechanisms, which can be used through different methods such as .persist(), .cache(), and CACHE TABLE. This native caching is effective with small data sets as well as in ETL pipelines where you need to cache intermediate results. However, Spark native caching currently does not work well with partitioning, since a cached table does not retain the partitioning data. A more generic and reliable caching technique is storage layer caching.

A nice image to show how memory is used on each executor yarn-spark-memory.png

  1. yarn.nodemanager.resource.memory-mb: controls the maximum sum of memory used by all containers on each Spark node.
  2. spark.yarn.executor.memoryOverhead: The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
  3. spark.executor.cores: defines the maximum number of cores each executor can use.
  4. spark.executor.memory: The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins. It also controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
  5. spark.driver.memory: Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g).
  6. spark.dynamicAllocation.enabled: Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.
  7. spark.default.parallelism: Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.
  8. spark.sql.shuffle.partitions: Configures the number of partitions to use when shuffling data for joins or aggregations.

To address 'out of memory' messages, try:

  1. Review DAG Management Shuffles. Reduce by map-side reducting, pre-partition (or bucketize) source data, maximize single shuffles, and reduce the amount of data sent.
  2. Prefer ReduceByKey with its fixed memory limit to GroupByKey, which provides aggregations, windowing, and other functions but it has ann unbounded memory limit.
  3. Prefer TreeReduce, which does more work on the executors or partitions, to Reduce, which does all work on the driver.
  4. Leverage DataFrames rather than the lower-level RDD objects.
  5. Create ComplexTypes that encapsulate actions, such as "Top N", various aggregations, or windowing operations.

drawing

Some important notes:

  1. Avoid GroupByKey See Avoid GroupByKey | Databricks Spark Knowledge Base. Basically, groupByKey creates unnecessary network overhead and collects data on the reduce workers. reduceByKey works better with larger datasets when compared to groupByKey.

  2. Data Serialization You can ignore this part since it's hard to understand for non-CS people. One thing you should keep in mind is that "Kryo serialization" is much better than default "Java serialization". You can also change this setting in your configuration file.

  3. Reduce Shuffles as much as possible Shuffles are expensive since they involve disk I/O, data serialization and network I/O. They are needed for operations like Join or groupBy and happen between stages.

  4. Be careful with Cache Although cache() or persist() can improve time performance, it reduces space performance or even creates Out of Memory issue. We should think twice when try to add cache.

  5. Coalesce vs repartition In the literature, it’s often mentioned that coalesce should be preferred over repartition to reduce the number of partitions because it avoids a shuffle step in some cases. But coalesce has some limitations (outside the scope of this article): it cannot increase the number of partitions and may generate skew partitions. Here is one case where a repartition should be preferred. In this case, we filter most of a dataset. df.doSth().coalesce(10).write(…) 1*GZcMFLDU-Ze5heqQv3yn_A.png The good point about coalesce is that it avoids a shuffle. However, all the computation is done by only 10 tasks. This is due to the fact that the number of tasks depends on the number of partitions of the output of the stage, each one computing a big bunch of data. So a maximum of 10 nodes will perform the computation. df.doSth().repartition(10).write(…) 1*yasSTKQzo2hug8VvFnPf8Q.png Using repartition we can see that the total duration is way shorter (a few seconds instead of 31 minutes). The filtering is done by 200 tasks, each one working on a small subset of data. It’s also way smoother from a memory point a view, as we can see in the graph below.

Some good advice:

  1. Avoid reduceByKey When the input and output value types are different.
  2. Running executors with too much memory often results in excessive garbage collection delays.

How-Tos:

  1. How to get the number of partitions? df.rdd.partitions().size()
  2. How do you increase the number of partitions? You can set the number of partitions in reduceByKey(), such as df.reduceByKey(_ + _, numPartitions=X). How to decide the value of X? The most straightforward way to tune the number of partitions is experimentation: Look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving.

WhatIfs:

  1. What if the number of tasks are too few? A small number of tasks also mean that more memory pressure is placed on any aggregation operations that occur in each task. Any join, cogroup, or *ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey and aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger. When the records destined for these aggregation operations do not easily fit in memory, some mayhem can ensue. First, holding many records in these data structures puts pressure on garbage collection, which can lead to pauses down the line. Second, when the records do not fit in memory, Spark will spill them to disk, which causes disk I/O and sorting. This overhead during large shuffles is probably the number one cause of job stalls I have seen at Cloudera customers.

Recommended Parameter Settings Suppose your cluster has 10 executors and each machine has 32G memory and 16 cores. We will use 30G memory and 12 cores for Spark jobs. This saves some resource for core OS and some other critical processes.

<property>
  <name>spark.executor.memory</name>
  <value>6G</value>
  <description>Amount of memory to use per executor process (e.g. 2g, 8g).</description>
</property>
<property>
  <name>spark.python.worker.memory</name>
  <value>1G</value>
  <description>Amount of memory to use per python worker process during aggregation, in the same format as JVM memory strings (e.g. 512m, 2g). If the memory used during aggregation goes above this amount, it will spill the data into disks.</description>
</property>
<property>
  <name>spark.memory.fraction</name>
  <value>0.75</value>
</property>
<property>
  <name>spark.memory.storageFraction</name>
  <value>0.5</value>
  <description>Amount of storage memory immune to eviction, expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory may be available to execution and tasks may spill to disk more often. Leaving this at the default value is recommended.</description>
</property>
<property>
  <name>spark.executor.cores</name>
  <value>2</value>
</property>
<property>
  <name>spark.default.parallelism</name>
  <value>180</value>
  <description>120 is the total number of cores and each executor uses 2. So there will be 60 executors in total. In general, we recommend 2-3 tasks per CPU core, so 180 is the #tasks. Most of the time, you can increase this number to solve OutOfMemory issue.</description>
</property>
<property>
  <name>spark.sql.shuffle.partitions</name>
  <value>180</value>
  <description>Same as above.</description>
</property>
<property>
  <name>spark.serializer</name>
  <value>org.apache.spark.serializer.KryoSerializer</value>
  <description>Use Kryo serializer instead.</description>
</property>

References:

  1. Optimize Spark jobs for performance - Azure HDInsight | Microsoft Docs
  2. How-to: Tune Your Apache Spark Jobs (Part 1) - Cloudera Engineering Blog
  3. How-to: Tune Your Apache Spark Jobs (Part 2) - Cloudera Engineering Blog
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment