/minimalExampleCollapsed.scala Secret
Last active
November 22, 2016 11:35
Collapsed version of https://gist.github.com/geoHeil/86e5401fc57351c70fd49047c88cea05
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package at.ac.tuwien.thesis.problem | |
import org.apache.log4j.{Level, Logger} | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
case class FooBar(city: String, postcode: String) | |
object Foo extends App { | |
// Logger.getLogger("org").setLevel(Level.WARN) | |
val conf: SparkConf = new SparkConf() | |
.setAppName("spark error") | |
.setMaster("local[*]") | |
.set("spark.executor.memory", "2G") | |
.set("spark.executor.cores", "4") | |
.set("spark.default.parallelism", "4") | |
.set("spark.driver.memory", "1G") | |
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.set("spark.speculation", "false") | |
.set("spark.sql.codegen.maxCaseBranches", "1") | |
val spark: SparkSession = SparkSession | |
.builder() | |
.config(conf) | |
// .enableHiveSupport() | |
.getOrCreate() | |
import spark.implicits._ | |
val myDf = Seq(("Ma. Lanzendorf", "0001"), ("123", "Munich"), ("?nnsbruck", "0104"), ("Deutschwagram", "1200"), | |
("Kirchschlag i.d. Buckl.Welt", "1234"), ("Bruck/Lth", "4567"), ("Schwarzach/Pg", "9801"), ("somethingTofilter", "foo"), | |
("A", "12"), ("Wien-Vereinte Nationen", "1400"), ("someREally long string name", "2346"), ("C", "333"), | |
("D", "3456"), ("E", "5432"), ("F", "1234"), ("F", "2345"), ("G", "4554"), ("H", "4544"), ("j", "7777")) | |
.toDF("city", "postcode") | |
.as[FooBar] | |
myDf.show | |
/** This is a fix for the possibly very simple match & replace cases | |
* http://stackoverflow.com/questions/40730223/spark-spelling-correction-via-udf not so sure about the more complex ones | |
* val spellingMistakes = Map( | |
* "error1" -> "fix1" | |
* ) | |
* * | |
* val spellingNameCorrection: (String => String) = (t: String) => { | |
*titles.get(t) match { | |
* case Some(tt) => tt // correct spelling | |
* case None => t // keep original | |
* } | |
* } | |
* val spellingUDF = udf(spellingNameCorrection) | |
* * | |
* val misspellings1 = hiddenSeasonalities | |
* .withColumn("A", spellingUDF('A)) | |
*/ | |
val misspellings1 = myDf | |
.filter('city =!= "somethingTofilter") | |
.withColumn("postcode", | |
when(('city === "A") and ('postcode === "12"), "2212") | |
.when(('city === "someREally long string name") and ('postcode === "2346"), "2340") | |
.when(('city === "D") and ('postcode === "3456"), "3500") | |
.when(('city === "F") and ('postcode === "1234"), "2222") | |
.when(('city === "F") and ('postcode === "2345"), "2222") | |
.otherwise('postcode) | |
) | |
.withColumn("city", | |
when('city === "Ma. Lanzendorf", "Maria Lanzendorf") | |
.when('city === "123", "Munich") | |
.when('city === "?nnsbruck", "Innsbruck") | |
.when('city === "Deutschwagram", "Deutsch-Wagram") | |
.when('city === "Kirchschlag i.d. Buckl.Welt", "Kirchschlag in der Buckligen Welt") | |
.when('city === "Bruck/Lth", "Gemeinde Bruck an der Leitha") | |
.when('city === "Schwarzach/Pg", "Schwarzach im Pongau") | |
.when(('city === "Wien-Vereinte Nationen") and ('postcode === "1400"), "Internationales Zentrum Wien") | |
.when(('city === "C") and ('postcode === "3333"), "fooCity") | |
.when('city === "D", "D_city") | |
.when(('city === "E") and ('postcode === "5432"), "Humenberg") | |
.when(('city === "G") and ('city === "6111"), "g_city") | |
.when(('city === "H") and ('postcode === "4544"), "Hfixed") | |
.when(('city === "j") and ('postcode === "7777"), "jfixed") | |
.otherwise('city)) | |
// val misspellings1 = myDf.filter('city =!= "somethingTofilter") | |
// .withColumn("city", when('city === "Ma. Lanzendorf", "Maria Lanzendorf").otherwise('city)) | |
// .withColumn("city", when('city === "123", "Munich").otherwise('city)) | |
// .withColumn("city", when('city === "?nnsbruck", "Innsbruck").otherwise('city)) | |
// .withColumn("city", when('city === "Deutschwagram", "Deutsch-Wagram").otherwise('city)) | |
// .withColumn("city", when('city === "Kirchschlag i.d. Buckl.Welt", "Kirchschlag in der Buckligen Welt").otherwise('city)) | |
// .withColumn("city", when('city === "Bruck/Lth", "Gemeinde Bruck an der Leitha").otherwise('city)) | |
// .withColumn("city", when('city === "Schwarzach/Pg", "Schwarzach im Pongau").otherwise('city)) | |
// .withColumn("postcode", when(('city === "A") and ('postcode === 12), "2212").otherwise('postcode)) | |
// .withColumn("city", when(('city === "Wien-Vereinte Nationen") and ('postcode === 1400), "Internationales Zentrum Wien").otherwise('city)) | |
// .withColumn("postcode", when(('city === "someREally long string name") and ('postcode === 2346), "2340").otherwise('postcode)) | |
// // in this minimal example apparently several more columns than initially supported work. but still it shows the same problems if columns up to here are enabled | |
// .withColumn("city", when(('city === "C") and ('postcode === 3333), "fooCity").otherwise('city)) | |
// .withColumn("postcode", when(('city === "D") and ('postcode === 3456), "3500").otherwise('postcode)) | |
// .withColumn("city", when('city === "D", "D_city").otherwise('city)) | |
// .withColumn("city", when(('city === "E") and ('postcode === 5432), "Humenberg").otherwise('city)) | |
// .withColumn("postcode", when(('city === "F") and ('postcode === 1234), "2222").otherwise('postcode)) | |
// .withColumn("postcode", when(('city === "F") and ('postcode === 2345), "2222").otherwise('postcode)) | |
// .withColumn("city", when(('city === "G") and ('city === 6111), "g_city").otherwise('city)) | |
// .withColumn("city", when(('city === "H") and ('postcode === 4544), "Hfixed").otherwise('city)) | |
// .withColumn("city", when(('city === "j") and ('postcode === 7777), "jfixed").otherwise('city)) | |
// if all are enabled again I get the familiar out of memory error | |
// misspellings1.debugCodegen | |
// print(misspellings1.explain(true)) | |
// import org.apache.spark.sql.execution.debug._ | |
// print(misspellings1.debugCodegen) | |
misspellings1.show(10000) | |
// | |
Thread.sleep(100000) | |
spark.stop | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
== Parsed Logical Plan ==
GlobalLimit 10001
+- LocalLimit 10001
+- Project [CASE WHEN (city#5 = Ma. Lanzendorf) THEN Maria Lanzendorf WHEN (city#5 = 123) THEN Munich WHEN (city#5 = ?nnsbruck) THEN Innsbruck WHEN (city#5 = Deutschwagram) THEN Deutsch-Wagram WHEN (city#5 = Kirchschlag i.d. Buckl.Welt) THEN Kirchschlag in der Buckligen Welt WHEN (city#5 = Bruck/Lth) THEN Gemeinde Bruck an der Leitha WHEN (city#5 = Schwarzach/Pg) THEN Schwarzach im Pongau WHEN ((city#5 = Wien-Vereinte Nationen) && (postcode#18 = 1400)) THEN Internationales Zentrum Wien WHEN ((city#5 = C) && (postcode#18 = 3333)) THEN fooCity WHEN (city#5 = D) THEN D_city WHEN ((city#5 = E) && (postcode#18 = 5432)) THEN Humenberg WHEN ((city#5 = G) && (city#5 = 6111)) THEN g_city WHEN ((city#5 = H) && (postcode#18 = 4544)) THEN Hfixed WHEN ((city#5 = j) && (postcode#18 = 7777)) THEN jfixed ELSE city#5 END AS city#22, postcode#18]
+- Project [city#5, CASE WHEN ((city#5 = A) && (postcode#6 = 12)) THEN 2212 WHEN ((city#5 = someREally long string name) && (postcode#6 = 2346)) THEN 2340 WHEN ((city#5 = D) && (postcode#6 = 3456)) THEN 3500 WHEN ((city#5 = F) && (postcode#6 = 1234)) THEN 2222 WHEN ((city#5 = F) && (postcode#6 = 2345)) THEN 2222 ELSE postcode#6 END AS postcode#18]
+- Filter NOT (city#5 = somethingTofilter)
+- Project [_1#2 AS city#5, _2#3 AS postcode#6]
+- LocalRelation [_1#2, _2#3]
== Analyzed Logical Plan ==
city: string, postcode: string
GlobalLimit 10001
+- LocalLimit 10001
+- Project [CASE WHEN (city#5 = Ma. Lanzendorf) THEN Maria Lanzendorf WHEN (city#5 = 123) THEN Munich WHEN (city#5 = ?nnsbruck) THEN Innsbruck WHEN (city#5 = Deutschwagram) THEN Deutsch-Wagram WHEN (city#5 = Kirchschlag i.d. Buckl.Welt) THEN Kirchschlag in der Buckligen Welt WHEN (city#5 = Bruck/Lth) THEN Gemeinde Bruck an der Leitha WHEN (city#5 = Schwarzach/Pg) THEN Schwarzach im Pongau WHEN ((city#5 = Wien-Vereinte Nationen) && (postcode#18 = 1400)) THEN Internationales Zentrum Wien WHEN ((city#5 = C) && (postcode#18 = 3333)) THEN fooCity WHEN (city#5 = D) THEN D_city WHEN ((city#5 = E) && (postcode#18 = 5432)) THEN Humenberg WHEN ((city#5 = G) && (city#5 = 6111)) THEN g_city WHEN ((city#5 = H) && (postcode#18 = 4544)) THEN Hfixed WHEN ((city#5 = j) && (postcode#18 = 7777)) THEN jfixed ELSE city#5 END AS city#22, postcode#18]
+- Project [city#5, CASE WHEN ((city#5 = A) && (postcode#6 = 12)) THEN 2212 WHEN ((city#5 = someREally long string name) && (postcode#6 = 2346)) THEN 2340 WHEN ((city#5 = D) && (postcode#6 = 3456)) THEN 3500 WHEN ((city#5 = F) && (postcode#6 = 1234)) THEN 2222 WHEN ((city#5 = F) && (postcode#6 = 2345)) THEN 2222 ELSE postcode#6 END AS postcode#18]
+- Filter NOT (city#5 = somethingTofilter)
+- Project [_1#2 AS city#5, _2#3 AS postcode#6]
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
GlobalLimit 10001
+- LocalLimit 10001
+- Project [CASE WHEN (_1#2 = Ma. Lanzendorf) THEN Maria Lanzendorf WHEN (_1#2 = 123) THEN Munich WHEN (_1#2 = ?nnsbruck) THEN Innsbruck WHEN (_1#2 = Deutschwagram) THEN Deutsch-Wagram WHEN (_1#2 = Kirchschlag i.d. Buckl.Welt) THEN Kirchschlag in der Buckligen Welt WHEN (_1#2 = Bruck/Lth) THEN Gemeinde Bruck an der Leitha WHEN (_1#2 = Schwarzach/Pg) THEN Schwarzach im Pongau WHEN ((_1#2 = Wien-Vereinte Nationen) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 1400)) THEN Internationales Zentrum Wien WHEN ((_1#2 = C) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 3333)) THEN fooCity WHEN (_1#2 = D) THEN D_city WHEN ((_1#2 = E) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 5432)) THEN Humenberg WHEN ((_1#2 = G) && (_1#2 = 6111)) THEN g_city WHEN ((_1#2 = H) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 4544)) THEN Hfixed WHEN ((_1#2 = j) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 7777)) THEN jfixed ELSE _1#2 END AS city#22, CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END AS postcode#18]
+- Filter (isnotnull(_1#2) && NOT (_1#2 = somethingTofilter))
+- LocalRelation [_1#2, _2#3]
== Physical Plan ==
CollectLimit 10001
+- Project [CASE WHEN (_1#2 = Ma. Lanzendorf) THEN Maria Lanzendorf WHEN (_1#2 = 123) THEN Munich WHEN (_1#2 = ?nnsbruck) THEN Innsbruck WHEN (_1#2 = Deutschwagram) THEN Deutsch-Wagram WHEN (_1#2 = Kirchschlag i.d. Buckl.Welt) THEN Kirchschlag in der Buckligen Welt WHEN (_1#2 = Bruck/Lth) THEN Gemeinde Bruck an der Leitha WHEN (_1#2 = Schwarzach/Pg) THEN Schwarzach im Pongau WHEN ((_1#2 = Wien-Vereinte Nationen) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 1400)) THEN Internationales Zentrum Wien WHEN ((_1#2 = C) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 3333)) THEN fooCity WHEN (_1#2 = D) THEN D_city WHEN ((_1#2 = E) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 5432)) THEN Humenberg WHEN ((_1#2 = G) && (_1#2 = 6111)) THEN g_city WHEN ((_1#2 = H) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 4544)) THEN Hfixed WHEN ((_1#2 = j) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 7777)) THEN jfixed ELSE _1#2 END AS city#22, CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END AS postcode#18]
+- *Filter (isnotnull(_1#2) && NOT (_1#2 = somethingTofilter))
+- LocalTableScan [_1#2, _2#3]