Created
May 15, 2017 15:39
-
-
Save anish749/587e8a12bfdb0de36b0d47ee5cf85e56 to your computer and use it in GitHub Desktop.
Code to produce GC OutOfMemory - Overhead Limit Exceeded error when appending data to Parquet files from Apache Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.anish.spark.gcissue | |
import org.apache.spark.sql.{DataFrame, SQLContext} | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scala.util.Random | |
/** | |
* A random code written to produce GC OutOfMemory Issue with Spark and Parquet. More details in my blog at <anish749.github.io> | |
* | |
* Created by anish on 06/05/17. | |
*/ | |
object SparkParquetGcOutOfMemory { | |
val sparkConf = new SparkConf().setAppName("ParquetGCIssue") | |
val sparkContext = new SparkContext(sparkConf) | |
val sqlContext = new SQLContext(sparkContext) | |
import sqlContext.implicits._ | |
def main(args: Array[String]): Unit = { | |
val config = parseArgs(args) | |
// This below config is set in Hadoop Config for it to work and setting in Spark Conf wont help. Default value is true. | |
// With the value set to true we run into problems after approximately 5 data loads. false works fine with no issues. | |
sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", config.parquetEnableSummaryMetadata) | |
// Create some junk data, and cache to to save again and again till we get the issue. | |
val bigData = createData(config.rows, config.cols).cache | |
// Repeatedly load the data to the configured number of save operations. | |
(1 to config.repeat).foreach(i => { | |
println("************* MY CODE ************ Starting to write : " + i) | |
bigData.write.format(config.outputTableFormat).mode(config.outputTableSaveMode).partitionBy("partitionVal").save(config.outputTablePath) | |
println("************* MY CODE ************ Completed writing : " + i) | |
}) | |
} | |
/** | |
* Create some large data frame with r rows and c cols, set while trigerring the job | |
* @param r | |
* @param c | |
* @return DataFrame with random values partitoned into 19 partitions | |
*/ | |
def createData(r: Int, c: Int): DataFrame = { | |
// Create the rows and add a partition column so that the data can be partitioned while being written to S3 | |
val df = sparkContext.parallelize(Seq.fill(r)(Random.nextInt)).toDF("num").withColumn("partitionVal", 'num % 10) | |
// Copy the data to 'c' columns | |
(1 to c).foldLeft(df) { (acc: DataFrame, x) => acc.withColumn("num_" + x, 'num) } | |
} | |
def parseArgs(args: Array[String]): Config = { | |
val sparkSaveModes = List("append", "overwrite", "ErrorIfExists", "Ignore") | |
// Generating the parser with all validations | |
val parser = new scopt.OptionParser[Config]("Jar") { | |
head("Site Catalyst BackFill Loader", "0.1") | |
opt[Int]('r', "rows") validate { x => if (x > 1) success else failure("Rows should be > 1") } action { (x, c) => c.copy(rows = x) } text "Number of rows" | |
opt[Int]('c', "cols") validate { x => if (x > 1) success else failure("Cols should be > 1") } action { (x, c) => c.copy(cols = x) } text "Number of cols" | |
opt[Int]('a', "repeat") validate { x => if (x > 0) success else failure("Repeat should be > 0") } action { (x, c) => c.copy(repeat = x) } text "Number of repeated writes" | |
opt[String]('t', "outputTableName") action { (x, c) => c.copy(outputTableName = x) } text "Output Hive Table" | |
opt[String]('p', "outputTablePath") required() action { (x, c) => c.copy(outputTablePath = x) } text "Path of Output Hive table" | |
opt[String]('f', "outputTableFormat") action { (x, c) => c.copy(outputTableFormat = x) } text "Output table storage format" | |
opt[String]('s', "outputTableSaveMode") validate { x => if (sparkSaveModes.contains(x)) success else failure("Invalid savemode") } action { (x, c) => c.copy(outputTableSaveMode = x) } text "Save Mode to use while writing output" | |
opt[Int]('m', "numberOfPartitions") validate { x => if (x > 0) success else failure("Number of partitions should be > 0") } action { (x, c) => c.copy(numberOfPartitions = x) } text "Number of partitions to consider" | |
opt[String]('b', "parquetEnableSummaryMetadata") validate { x => if (x.equals("true") || x.equals("false")) success else failure("The property parquetEnableSummaryMetadata should be true or false only") } action { (x, c) => c.copy(parquetEnableSummaryMetadata = x) } text "The config value for parquet.enable.summary.metadata in hadoop configuration" | |
help("help") text "prints this usage text" | |
} | |
parser.parse(args, Config()) match { | |
case Some(c) => println("Config for the app is " + c.prettyString) | |
c | |
case None => throw new IllegalArgumentException("Invalid config") | |
} | |
} | |
} | |
case class Config | |
( | |
rows: Int = 2, | |
cols: Int = 2, | |
repeat: Int = 1, | |
outputTableName: String = "feed.temporary", | |
outputTablePath: String = "s3a://unknown/", | |
outputTableFormat: String = "parquet", | |
outputTableSaveMode: String = "overwrite", | |
numberOfPartitions: Int = 1, | |
parquetEnableSummaryMetadata: String = "true" | |
) { | |
def prettyString: String = "\n\t" + | |
"rows = " + rows + "\n\t" + | |
"cols = " + cols + "\n\t" + | |
"repeat = " + repeat + "\n\t" + | |
"outputTableName = " + outputTableName + "\n\t" + | |
"outputTablePath = " + outputTablePath + "\n\t" + | |
"outputTableFormat = " + outputTableFormat + "\n\t" + | |
"outputTableSaveMode = " + outputTableSaveMode + "\n\t" + | |
"numberOfPartitions = " + numberOfPartitions + "\n\t" + | |
"parquetEnableSummaryMetadata = " + parquetEnableSummaryMetadata + "\n" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment