Created
November 22, 2016 07:06
-
-
Save geoHeil/86e5401fc57351c70fd49047c88cea05 to your computer and use it in GitHub Desktop.
spark memory problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package at.ac.tuwien.thesis.problem | |
import org.apache.log4j.{Level, Logger} | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
case class FooBar(city: String, postcode: String) | |
object Foo extends App { | |
Logger.getLogger("org").setLevel(Level.WARN) | |
val conf: SparkConf = new SparkConf() | |
.setAppName("spark error") | |
.setMaster("local[*]") | |
.set("spark.executor.memory", "2G") | |
.set("spark.executor.cores", "4") | |
.set("spark.default.parallelism", "4") | |
.set("spark.driver.memory", "1G") | |
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.set("spark.speculation", "false") | |
val spark: SparkSession = SparkSession | |
.builder() | |
.config(conf) | |
// .enableHiveSupport() | |
.getOrCreate() | |
import spark.implicits._ | |
// import org.apache.spark.sql.execution.debug._ | |
val myDf = Seq(("Ma. Lanzendorf", "0001"), ("123", "Munich"), ("?nnsbruck", "0104"), ("Deutschwagram", "1200"), | |
("Kirchschlag i.d. Buckl.Welt", "1234"), ("Bruck/Lth", "4567"), ("Schwarzach/Pg", "9801"), ("somethingTofilter", "foo"), | |
("A", "12"), ("Wien-Vereinte Nationen", "1400"), ("someREally long string name", "2346"), ("C", "333"), | |
("D", "3456"), ("E", "5432"), ("F", "1234"), ("F", "2345"), ("G", "4554"), ("H", "4544"), ("j", "7777")) | |
.toDF("city", "postcode") | |
.as[FooBar] | |
myDf.show | |
/** This is a fix for the possibly very simple match & replace cases | |
* http://stackoverflow.com/questions/40730223/spark-spelling-correction-via-udf not so sure about the more complex ones | |
* val spellingMistakes = Map( | |
* "error1" -> "fix1" | |
* ) | |
* * | |
* val spellingNameCorrection: (String => String) = (t: String) => { | |
*titles.get(t) match { | |
* case Some(tt) => tt // correct spelling | |
* case None => t // keep original | |
* } | |
* } | |
* val spellingUDF = udf(spellingNameCorrection) | |
* * | |
* val misspellings1 = hiddenSeasonalities | |
* .withColumn("A", spellingUDF('A)) | |
*/ | |
val misspellings1 = myDf.filter('city =!= "somethingTofilter") | |
.withColumn("city", when('city === "Ma. Lanzendorf", "Maria Lanzendorf").otherwise('city)) | |
.withColumn("city", when('city === "123", "Munich").otherwise('city)) | |
.withColumn("city", when('city === "?nnsbruck", "Innsbruck").otherwise('city)) | |
.withColumn("city", when('city === "Deutschwagram", "Deutsch-Wagram").otherwise('city)) | |
.withColumn("city", when('city === "Kirchschlag i.d. Buckl.Welt", "Kirchschlag in der Buckligen Welt").otherwise('city)) | |
.withColumn("city", when('city === "Bruck/Lth", "Gemeinde Bruck an der Leitha").otherwise('city)) | |
.withColumn("city", when('city === "Schwarzach/Pg", "Schwarzach im Pongau").otherwise('city)) | |
.withColumn("postcode", when(('city === "A") and ('postcode === 12), "2212").otherwise('postcode)) | |
.withColumn("city", when(('city === "Wien-Vereinte Nationen") and ('postcode === 1400), "Internationales Zentrum Wien").otherwise('city)) | |
.withColumn("postcode", when(('city === "someREally long string name") and ('postcode === 2346), "2340").otherwise('postcode)) | |
// in this minimal example apparently several more columns than initially supported work. but still it shows the same problems if columns up to here are enabled | |
.withColumn("city", when(('city === "C") and ('postcode === 3333), "fooCity").otherwise('city)) | |
.withColumn("postcode", when(('city === "D") and ('postcode === 3456), "3500").otherwise('postcode)) | |
.withColumn("city", when('city === "D", "D_city").otherwise('city)) | |
.withColumn("city", when(('city === "E") and ('postcode === 5432), "Humenberg").otherwise('city)) | |
.withColumn("postcode", when(('city === "F") and ('postcode === 1234), "2222").otherwise('postcode)) | |
.withColumn("postcode", when(('city === "F") and ('postcode === 2345), "2222").otherwise('postcode)) | |
.withColumn("city", when(('city === "G") and ('city === 6111), "g_city").otherwise('city)) | |
.withColumn("city", when(('city === "H") and ('postcode === 4544), "Hfixed").otherwise('city)) | |
.withColumn("city", when(('city === "j") and ('postcode === 7777), "jfixed").otherwise('city)) | |
// if all are enabled again I get the familiar out of memory error | |
// misspellings1.debugCodegen | |
// print(misspellings1.debugCodegen) | |
misspellings1.show | |
spark.stop | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Links to https://issues.apache.org/jira/browse/SPARK-18532 and http://stackoverflow.com/questions/40728075/spark-whole-stage-codegen-disabled