Skip to content

Instantly share code, notes, and snippets.

@geoHeil
Created November 22, 2016 07:06
Show Gist options
  • Save geoHeil/86e5401fc57351c70fd49047c88cea05 to your computer and use it in GitHub Desktop.
Save geoHeil/86e5401fc57351c70fd49047c88cea05 to your computer and use it in GitHub Desktop.
spark memory problem
package at.ac.tuwien.thesis.problem
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
case class FooBar(city: String, postcode: String)
object Foo extends App {
Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
.setAppName("spark error")
.setMaster("local[*]")
.set("spark.executor.memory", "2G")
.set("spark.executor.cores", "4")
.set("spark.default.parallelism", "4")
.set("spark.driver.memory", "1G")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.speculation", "false")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
// .enableHiveSupport()
.getOrCreate()
import spark.implicits._
// import org.apache.spark.sql.execution.debug._
val myDf = Seq(("Ma. Lanzendorf", "0001"), ("123", "Munich"), ("?nnsbruck", "0104"), ("Deutschwagram", "1200"),
("Kirchschlag i.d. Buckl.Welt", "1234"), ("Bruck/Lth", "4567"), ("Schwarzach/Pg", "9801"), ("somethingTofilter", "foo"),
("A", "12"), ("Wien-Vereinte Nationen", "1400"), ("someREally long string name", "2346"), ("C", "333"),
("D", "3456"), ("E", "5432"), ("F", "1234"), ("F", "2345"), ("G", "4554"), ("H", "4544"), ("j", "7777"))
.toDF("city", "postcode")
.as[FooBar]
myDf.show
/** This is a fix for the possibly very simple match & replace cases
* http://stackoverflow.com/questions/40730223/spark-spelling-correction-via-udf not so sure about the more complex ones
* val spellingMistakes = Map(
* "error1" -> "fix1"
* )
* *
* val spellingNameCorrection: (String => String) = (t: String) => {
*titles.get(t) match {
* case Some(tt) => tt // correct spelling
* case None => t // keep original
* }
* }
* val spellingUDF = udf(spellingNameCorrection)
* *
* val misspellings1 = hiddenSeasonalities
* .withColumn("A", spellingUDF('A))
*/
val misspellings1 = myDf.filter('city =!= "somethingTofilter")
.withColumn("city", when('city === "Ma. Lanzendorf", "Maria Lanzendorf").otherwise('city))
.withColumn("city", when('city === "123", "Munich").otherwise('city))
.withColumn("city", when('city === "?nnsbruck", "Innsbruck").otherwise('city))
.withColumn("city", when('city === "Deutschwagram", "Deutsch-Wagram").otherwise('city))
.withColumn("city", when('city === "Kirchschlag i.d. Buckl.Welt", "Kirchschlag in der Buckligen Welt").otherwise('city))
.withColumn("city", when('city === "Bruck/Lth", "Gemeinde Bruck an der Leitha").otherwise('city))
.withColumn("city", when('city === "Schwarzach/Pg", "Schwarzach im Pongau").otherwise('city))
.withColumn("postcode", when(('city === "A") and ('postcode === 12), "2212").otherwise('postcode))
.withColumn("city", when(('city === "Wien-Vereinte Nationen") and ('postcode === 1400), "Internationales Zentrum Wien").otherwise('city))
.withColumn("postcode", when(('city === "someREally long string name") and ('postcode === 2346), "2340").otherwise('postcode))
// in this minimal example apparently several more columns than initially supported work. but still it shows the same problems if columns up to here are enabled
.withColumn("city", when(('city === "C") and ('postcode === 3333), "fooCity").otherwise('city))
.withColumn("postcode", when(('city === "D") and ('postcode === 3456), "3500").otherwise('postcode))
.withColumn("city", when('city === "D", "D_city").otherwise('city))
.withColumn("city", when(('city === "E") and ('postcode === 5432), "Humenberg").otherwise('city))
.withColumn("postcode", when(('city === "F") and ('postcode === 1234), "2222").otherwise('postcode))
.withColumn("postcode", when(('city === "F") and ('postcode === 2345), "2222").otherwise('postcode))
.withColumn("city", when(('city === "G") and ('city === 6111), "g_city").otherwise('city))
.withColumn("city", when(('city === "H") and ('postcode === 4544), "Hfixed").otherwise('city))
.withColumn("city", when(('city === "j") and ('postcode === 7777), "jfixed").otherwise('city))
// if all are enabled again I get the familiar out of memory error
// misspellings1.debugCodegen
// print(misspellings1.debugCodegen)
misspellings1.show
spark.stop
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment