Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Code to produce GC OutOfMemory - Overhead Limit Exceeded error when appending data to Parquet files from Apache Spark
package org.anish.spark.gcissue
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
/**
* A random code written to produce GC OutOfMemory Issue with Spark and Parquet. More details in my blog at <anish749.github.io>
*
* Created by anish on 06/05/17.
*/
object SparkParquetGcOutOfMemory {
val sparkConf = new SparkConf().setAppName("ParquetGCIssue")
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits._
def main(args: Array[String]): Unit = {
val config = parseArgs(args)
// This below config is set in Hadoop Config for it to work and setting in Spark Conf wont help. Default value is true.
// With the value set to true we run into problems after approximately 5 data loads. false works fine with no issues.
sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", config.parquetEnableSummaryMetadata)
// Create some junk data, and cache to to save again and again till we get the issue.
val bigData = createData(config.rows, config.cols).cache
// Repeatedly load the data to the configured number of save operations.
(1 to config.repeat).foreach(i => {
println("************* MY CODE ************ Starting to write : " + i)
bigData.write.format(config.outputTableFormat).mode(config.outputTableSaveMode).partitionBy("partitionVal").save(config.outputTablePath)
println("************* MY CODE ************ Completed writing : " + i)
})
}
/**
* Create some large data frame with r rows and c cols, set while trigerring the job
* @param r
* @param c
* @return DataFrame with random values partitoned into 19 partitions
*/
def createData(r: Int, c: Int): DataFrame = {
// Create the rows and add a partition column so that the data can be partitioned while being written to S3
val df = sparkContext.parallelize(Seq.fill(r)(Random.nextInt)).toDF("num").withColumn("partitionVal", 'num % 10)
// Copy the data to 'c' columns
(1 to c).foldLeft(df) { (acc: DataFrame, x) => acc.withColumn("num_" + x, 'num) }
}
def parseArgs(args: Array[String]): Config = {
val sparkSaveModes = List("append", "overwrite", "ErrorIfExists", "Ignore")
// Generating the parser with all validations
val parser = new scopt.OptionParser[Config]("Jar") {
head("Site Catalyst BackFill Loader", "0.1")
opt[Int]('r', "rows") validate { x => if (x > 1) success else failure("Rows should be > 1") } action { (x, c) => c.copy(rows = x) } text "Number of rows"
opt[Int]('c', "cols") validate { x => if (x > 1) success else failure("Cols should be > 1") } action { (x, c) => c.copy(cols = x) } text "Number of cols"
opt[Int]('a', "repeat") validate { x => if (x > 0) success else failure("Repeat should be > 0") } action { (x, c) => c.copy(repeat = x) } text "Number of repeated writes"
opt[String]('t', "outputTableName") action { (x, c) => c.copy(outputTableName = x) } text "Output Hive Table"
opt[String]('p', "outputTablePath") required() action { (x, c) => c.copy(outputTablePath = x) } text "Path of Output Hive table"
opt[String]('f', "outputTableFormat") action { (x, c) => c.copy(outputTableFormat = x) } text "Output table storage format"
opt[String]('s', "outputTableSaveMode") validate { x => if (sparkSaveModes.contains(x)) success else failure("Invalid savemode") } action { (x, c) => c.copy(outputTableSaveMode = x) } text "Save Mode to use while writing output"
opt[Int]('m', "numberOfPartitions") validate { x => if (x > 0) success else failure("Number of partitions should be > 0") } action { (x, c) => c.copy(numberOfPartitions = x) } text "Number of partitions to consider"
opt[String]('b', "parquetEnableSummaryMetadata") validate { x => if (x.equals("true") || x.equals("false")) success else failure("The property parquetEnableSummaryMetadata should be true or false only") } action { (x, c) => c.copy(parquetEnableSummaryMetadata = x) } text "The config value for parquet.enable.summary.metadata in hadoop configuration"
help("help") text "prints this usage text"
}
parser.parse(args, Config()) match {
case Some(c) => println("Config for the app is " + c.prettyString)
c
case None => throw new IllegalArgumentException("Invalid config")
}
}
}
case class Config
(
rows: Int = 2,
cols: Int = 2,
repeat: Int = 1,
outputTableName: String = "feed.temporary",
outputTablePath: String = "s3a://unknown/",
outputTableFormat: String = "parquet",
outputTableSaveMode: String = "overwrite",
numberOfPartitions: Int = 1,
parquetEnableSummaryMetadata: String = "true"
) {
def prettyString: String = "\n\t" +
"rows = " + rows + "\n\t" +
"cols = " + cols + "\n\t" +
"repeat = " + repeat + "\n\t" +
"outputTableName = " + outputTableName + "\n\t" +
"outputTablePath = " + outputTablePath + "\n\t" +
"outputTableFormat = " + outputTableFormat + "\n\t" +
"outputTableSaveMode = " + outputTableSaveMode + "\n\t" +
"numberOfPartitions = " + numberOfPartitions + "\n\t" +
"parquetEnableSummaryMetadata = " + parquetEnableSummaryMetadata + "\n"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment