Skip to content

Instantly share code, notes, and snippets.

@jamesrajendran
Last active April 1, 2024 09:39
Show Gist options
  • Save jamesrajendran/2f853417d7eaafe02b4cc8b3a181f7ba to your computer and use it in GitHub Desktop.
Save jamesrajendran/2f853417d7eaafe02b4cc8b3a181f7ba to your computer and use it in GitHub Desktop.
Spark performance Tuning
1.mapPartition() instead of map() - when some expensive initializations like DBconnection need to be done
2.RDD Parallelism: for No parent RDDs, example, sc.parallelize(',,,',4),Unless specified YARN will try to use as many CPU cores as available
This could be tuned using spark.default.parallelism property.
- to find default parallelism use sc.defaultParallelism
rdd.getNumPartitions()
rdd = sc.parallelize(<value>, numSlices=4)
rdd.getNumPartitions() will return 4
minimum partitions is 2
with text files: parallelism alligns with number of blocks.
minPartitions can be set like - sc.textFile('...csv',minPartitions=4)
3.Narrow Dependencies/Operations - within the same partition not across
operations are , map, flatMap, filter
Wide dependencies/operations - shuffle - reduceByKey, groupByKey, Repartition, join
to adjust partitions
rdd.repartition(200)
rdd.coalesce(20)
With Paired RDDs, keys will be hashed to group related partitions.
4.ReduceBKey is faster than GroupByKey
reducebyKey does partial reduce in the partition before shuffling and doing final reduce
groupByKey sends raw data and does the final reduce
aggregateByKey,foldbykey, combinebykey also preferred to groupByKey
5. join:
small table with large table
Use BroadcastHashJoin not ShuffledHashJoin -- keep the small table in memory
the idea is similar to Distributed Cache in MR -- avoids any shuffling
To see which kind of join is used - use explain or .DebugString()
Analyze table will provide the statiscs needed
set spark.sql.autoBroadcastJoinThreshold big enough to hold the small table.
sparkSQL - left inner join broadcast(right)
import org.apache.spark.sql.functions.broadcast
dataframe - largedataframe.join(broadcast(smalldataframe), "key")
medium table with large table:
See if large table could be filtered witht the medium table so shuffle of large table is reduced - eg CA data vs Worldwide data
6. Do not call Collect() on a large RDD.
instead use count() or take() or saveAsTextFile() action
7. Serialization.
Java serialization is flexible(implement serializable) but slow.
Kryo is 10x faster - need to register the classes before using without registering will store full class name(wasteful)
sparkConfObj.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConfObj.registerKryoClasses(Array(classOf[class1],classOf[class2]))
if the objects are big increase spark.kryoserializer.buffer
8. Memory Tuning.
1. object size in memory
2. cost of accessing these objects
3. overhead of DC(with high turnover objects)
The problem:
Java object size 2-5X bigger than raw data.
1. each object has object header - 16 bytes storing pointers to its class- for an one Int field this is quite big.
2. String - 40 bytes overhead - as stored as array of chars - array length should be stored.
- each char takes extra 2 bytes - to support UTF-16 encoding. 10 char string will consume 60 chars.
3. common collection classes(HashMap, LinkedList ) use linked data structures . each entry will have a wrapper has a header and pointer to next object in the list(8 bytes)
4. collection of primitive types store them as 'boxed' objects
The solution strategies:
so, how to improve memory usage of objects!
1. by changing the data structures or
2. storing in a serialized format.
M: The memory used for storage and execution of spark within JVM Heap - typical 60% - 40% used for user data structures, internal spark metadata, reserve against OOM errors.
M is used by both storage and execution for spark.
set by spark.memory.fraction
R: Storage within M that is immune to eviction.
set by spark.memory.storageFraction.
Data Structure tuning:
1.prefer arrays of objects and primitive types instead of java/scala collections(HashMap) - use 'fastutil' library(extension to java collections )
2.avoid nested structures with a lot of small objects and pointers
3.for Keys - use numeric Ids or Enumeration instead of Strings.
4.if RAM size < 32 GB, use JVM flag -xx:+UseCompressedOops, which uses 4 bytes for pointers not 8 - set in spark-env.sh
Serialized RDD storage: store in memory in serialized form using RDD APIs like MEMORY_ONLY_SER - spark stores whole RDD as one byte array - downside is deserialize and access these objects(slow) - use with Kryo
First solution for GC slowness .
GC Tuning:
use java properteis - verbose:GC -xx:+PrintGCDetails -xx:+PrintGCTimeStamps to Java Options.
spark-submit --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
note: these logs are written in worker nodes not in driver.
Advanced GC Tuning:
1.coleect GC stats - if GC invoked multiple times before tasks complted --> not enough memory for executing tasks!!
2.if too many minor GC collections happen, increase size of Eden.
3.if oldGen memory is close to full, reduce m size - better to cache fewer objects than slowing down tasks.
4.Try G1GC with -xx:+G1GC
9.Data Locality - process where data resides.
when not possible try to send code to data not viceversa.
Levels of locality - closest to farthest
PROCESS_LOCAL - data is in the same JVM
NODE_LOCAL - in the same nodes
NO_PREF - no locality preference
RACK_LOCAL
ANY - elswhere in the network not in same Rack.
Using Kryo and data in serialized form should take care of most performance problems.
-------------------------spark mistakes to avoid-----------------------------------------------
https://www.youtube.com/watch?v=WyfHUNnMutg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment