Code to produce GC OutOfMemory - Overhead Limit Exceeded error when appending data to Parquet files from Apache Spark
package org.anish.spark.gcissue | |
import org.apache.spark.sql.{DataFrame, SQLContext} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scala.util.Random | |
/** | |
* A random code written to produce GC OutOfMemory Issue with Spark and Parquet. More details in my blog at <anish749.github.io> | |
* | |
* Created by anish on 06/05/17. | |
*/ | |
object SparkParquetGcOutOfMemory { | |
val sparkConf = new SparkConf().setAppName("ParquetGCIssue") | |
val sparkContext = new SparkContext(sparkConf) | |
val sqlContext = new SQLContext(sparkContext) | |
import sqlContext.implicits._ | |
def main(args: Array[String]): Unit = { | |
val config = parseArgs(args) | |
// This below config is set in Hadoop Config for it to work and setting in Spark Conf wont help. Default value is true. | |
// With the value set to true we run into problems after approximately 5 data loads. false works fine with no issues. | |
sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", config.parquetEnableSummaryMetadata) | |
// Create some junk data, and cache to to save again and again till we get the issue. | |
val bigData = createData(config.rows, config.cols).cache | |
// Repeatedly load the data to the configured number of save operations. | |
(1 to config.repeat).foreach(i => { | |
println("************* MY CODE ************ Starting to write : " + i) | |
bigData.write.format(config.outputTableFormat).mode(config.outputTableSaveMode).partitionBy("partitionVal").save(config.outputTablePath) | |
println("************* MY CODE ************ Completed writing : " + i) | |
}) | |
} | |
/** | |
* Create some large data frame with r rows and c cols, set while trigerring the job | |
* @param r | |
* @param c | |
* @return DataFrame with random values partitoned into 19 partitions | |
*/ | |
def createData(r: Int, c: Int): DataFrame = { | |
// Create the rows and add a partition column so that the data can be partitioned while being written to S3 | |
val df = sparkContext.parallelize(Seq.fill(r)(Random.nextInt)).toDF("num").withColumn("partitionVal", 'num % 10) | |
// Copy the data to 'c' columns | |
(1 to c).foldLeft(df) { (acc: DataFrame, x) => acc.withColumn("num_" + x, 'num) } | |
} | |
def parseArgs(args: Array[String]): Config = { | |
val sparkSaveModes = List("append", "overwrite", "ErrorIfExists", "Ignore") | |
// Generating the parser with all validations | |
val parser = new scopt.OptionParser[Config]("Jar") { | |
head("Site Catalyst BackFill Loader", "0.1") | |
opt[Int]('r', "rows") validate { x => if (x > 1) success else failure("Rows should be > 1") } action { (x, c) => c.copy(rows = x) } text "Number of rows" | |
opt[Int]('c', "cols") validate { x => if (x > 1) success else failure("Cols should be > 1") } action { (x, c) => c.copy(cols = x) } text "Number of cols" | |
opt[Int]('a', "repeat") validate { x => if (x > 0) success else failure("Repeat should be > 0") } action { (x, c) => c.copy(repeat = x) } text "Number of repeated writes" | |
opt[String]('t', "outputTableName") action { (x, c) => c.copy(outputTableName = x) } text "Output Hive Table" | |
opt[String]('p', "outputTablePath") required() action { (x, c) => c.copy(outputTablePath = x) } text "Path of Output Hive table" | |
opt[String]('f', "outputTableFormat") action { (x, c) => c.copy(outputTableFormat = x) } text "Output table storage format" | |
opt[String]('s', "outputTableSaveMode") validate { x => if (sparkSaveModes.contains(x)) success else failure("Invalid savemode") } action { (x, c) => c.copy(outputTableSaveMode = x) } text "Save Mode to use while writing output" | |
opt[Int]('m', "numberOfPartitions") validate { x => if (x > 0) success else failure("Number of partitions should be > 0") } action { (x, c) => c.copy(numberOfPartitions = x) } text "Number of partitions to consider" | |
opt[String]('b', "parquetEnableSummaryMetadata") validate { x => if (x.equals("true") || x.equals("false")) success else failure("The property parquetEnableSummaryMetadata should be true or false only") } action { (x, c) => c.copy(parquetEnableSummaryMetadata = x) } text "The config value for parquet.enable.summary.metadata in hadoop configuration" | |
help("help") text "prints this usage text" | |
} | |
parser.parse(args, Config()) match { | |
case Some(c) => println("Config for the app is " + c.prettyString) | |
c | |
case None => throw new IllegalArgumentException("Invalid config") | |
} | |
} | |
} | |
case class Config | |
( | |
rows: Int = 2, | |
cols: Int = 2, | |
repeat: Int = 1, | |
outputTableName: String = "feed.temporary", | |
outputTablePath: String = "s3a://unknown/", | |
outputTableFormat: String = "parquet", | |
outputTableSaveMode: String = "overwrite", | |
numberOfPartitions: Int = 1, | |
parquetEnableSummaryMetadata: String = "true" | |
) { | |
def prettyString: String = "\n\t" + | |
"rows = " + rows + "\n\t" + | |
"cols = " + cols + "\n\t" + | |
"repeat = " + repeat + "\n\t" + | |
"outputTableName = " + outputTableName + "\n\t" + | |
"outputTablePath = " + outputTablePath + "\n\t" + | |
"outputTableFormat = " + outputTableFormat + "\n\t" + | |
"outputTableSaveMode = " + outputTableSaveMode + "\n\t" + | |
"numberOfPartitions = " + numberOfPartitions + "\n\t" + | |
"parquetEnableSummaryMetadata = " + parquetEnableSummaryMetadata + "\n" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment