Skip to content

Instantly share code, notes, and snippets.

@rchukh
Last active November 22, 2016 11:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rchukh/84ac39310b384abedb89c299b24b9306 to your computer and use it in GitHub Desktop.
Save rchukh/84ac39310b384abedb89c299b24b9306 to your computer and use it in GitHub Desktop.
package at.ac.tuwien.thesis.problem
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
case class FooBar(city: String, postcode: String)
object Foo extends App {
// Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
.setAppName("spark error")
.setMaster("local[*]")
.set("spark.executor.memory", "2G")
.set("spark.executor.cores", "4")
.set("spark.default.parallelism", "4")
.set("spark.driver.memory", "1G")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.speculation", "false")
.set("spark.sql.codegen.maxCaseBranches", "1")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
// .enableHiveSupport()
.getOrCreate()
import spark.implicits._
val myDf = Seq(("Ma. Lanzendorf", "0001"), ("123", "Munich"), ("?nnsbruck", "0104"), ("Deutschwagram", "1200"),
("Kirchschlag i.d. Buckl.Welt", "1234"), ("Bruck/Lth", "4567"), ("Schwarzach/Pg", "9801"), ("somethingTofilter", "foo"),
("A", "12"), ("Wien-Vereinte Nationen", "1400"), ("someREally long string name", "2346"), ("C", "333"),
("D", "3456"), ("E", "5432"), ("F", "1234"), ("F", "2345"), ("G", "4554"), ("H", "4544"), ("j", "7777"))
.toDF("city", "postcode")
.as[FooBar]
myDf.show
/** This is a fix for the possibly very simple match & replace cases
* http://stackoverflow.com/questions/40730223/spark-spelling-correction-via-udf not so sure about the more complex ones
* val spellingMistakes = Map(
* "error1" -> "fix1"
* )
* *
* val spellingNameCorrection: (String => String) = (t: String) => {
*titles.get(t) match {
* case Some(tt) => tt // correct spelling
* case None => t // keep original
* }
* }
* val spellingUDF = udf(spellingNameCorrection)
* *
* val misspellings1 = hiddenSeasonalities
* .withColumn("A", spellingUDF('A))
*/
val misspellings1 = myDf
.filter('city =!= "somethingTofilter")
.withColumn("postcode",
when(('city === "A") and ('postcode === "12"), "2212")
.when(('city === "someREally long string name") and ('postcode === "2346"), "2340")
.when(('city === "D") and ('postcode === "3456"), "3500")
.when(('city === "F") and ('postcode === "1234"), "2222")
.when(('city === "F") and ('postcode === "2345"), "2222")
.otherwise('postcode)
)
.withColumn("city",
when('city === "Ma. Lanzendorf", "Maria Lanzendorf")
.when('city === "123", "Munich")
.when('city === "?nnsbruck", "Innsbruck")
.when('city === "Deutschwagram", "Deutsch-Wagram")
.when('city === "Kirchschlag i.d. Buckl.Welt", "Kirchschlag in der Buckligen Welt")
.when('city === "Bruck/Lth", "Gemeinde Bruck an der Leitha")
.when('city === "Schwarzach/Pg", "Schwarzach im Pongau")
.when(('city === "Wien-Vereinte Nationen") and ('postcode === "1400"), "Internationales Zentrum Wien")
.when(('city === "C") and ('postcode === "3333"), "fooCity")
.when('city === "D", "D_city")
.when(('city === "E") and ('postcode === "5432"), "Humenberg")
.when(('city === "G") and ('city === "6111"), "g_city")
.when(('city === "H") and ('postcode === "4544"), "Hfixed")
.when(('city === "j") and ('postcode === "7777"), "jfixed")
.otherwise('city))
// val misspellings1 = myDf.filter('city =!= "somethingTofilter")
// .withColumn("city", when('city === "Ma. Lanzendorf", "Maria Lanzendorf").otherwise('city))
// .withColumn("city", when('city === "123", "Munich").otherwise('city))
// .withColumn("city", when('city === "?nnsbruck", "Innsbruck").otherwise('city))
// .withColumn("city", when('city === "Deutschwagram", "Deutsch-Wagram").otherwise('city))
// .withColumn("city", when('city === "Kirchschlag i.d. Buckl.Welt", "Kirchschlag in der Buckligen Welt").otherwise('city))
// .withColumn("city", when('city === "Bruck/Lth", "Gemeinde Bruck an der Leitha").otherwise('city))
// .withColumn("city", when('city === "Schwarzach/Pg", "Schwarzach im Pongau").otherwise('city))
// .withColumn("postcode", when(('city === "A") and ('postcode === 12), "2212").otherwise('postcode))
// .withColumn("city", when(('city === "Wien-Vereinte Nationen") and ('postcode === 1400), "Internationales Zentrum Wien").otherwise('city))
// .withColumn("postcode", when(('city === "someREally long string name") and ('postcode === 2346), "2340").otherwise('postcode))
// // in this minimal example apparently several more columns than initially supported work. but still it shows the same problems if columns up to here are enabled
// .withColumn("city", when(('city === "C") and ('postcode === 3333), "fooCity").otherwise('city))
// .withColumn("postcode", when(('city === "D") and ('postcode === 3456), "3500").otherwise('postcode))
// .withColumn("city", when('city === "D", "D_city").otherwise('city))
// .withColumn("city", when(('city === "E") and ('postcode === 5432), "Humenberg").otherwise('city))
// .withColumn("postcode", when(('city === "F") and ('postcode === 1234), "2222").otherwise('postcode))
// .withColumn("postcode", when(('city === "F") and ('postcode === 2345), "2222").otherwise('postcode))
// .withColumn("city", when(('city === "G") and ('city === 6111), "g_city").otherwise('city))
// .withColumn("city", when(('city === "H") and ('postcode === 4544), "Hfixed").otherwise('city))
// .withColumn("city", when(('city === "j") and ('postcode === 7777), "jfixed").otherwise('city))
// if all are enabled again I get the familiar out of memory error
// misspellings1.debugCodegen
// print(misspellings1.explain(true))
// import org.apache.spark.sql.execution.debug._
// print(misspellings1.debugCodegen)
misspellings1.show(10000)
//
Thread.sleep(100000)
spark.stop
}
@rchukh
Copy link
Author

rchukh commented Nov 22, 2016

== Parsed Logical Plan ==
GlobalLimit 10001
+- LocalLimit 10001
+- Project [CASE WHEN (city#5 = Ma. Lanzendorf) THEN Maria Lanzendorf WHEN (city#5 = 123) THEN Munich WHEN (city#5 = ?nnsbruck) THEN Innsbruck WHEN (city#5 = Deutschwagram) THEN Deutsch-Wagram WHEN (city#5 = Kirchschlag i.d. Buckl.Welt) THEN Kirchschlag in der Buckligen Welt WHEN (city#5 = Bruck/Lth) THEN Gemeinde Bruck an der Leitha WHEN (city#5 = Schwarzach/Pg) THEN Schwarzach im Pongau WHEN ((city#5 = Wien-Vereinte Nationen) && (postcode#18 = 1400)) THEN Internationales Zentrum Wien WHEN ((city#5 = C) && (postcode#18 = 3333)) THEN fooCity WHEN (city#5 = D) THEN D_city WHEN ((city#5 = E) && (postcode#18 = 5432)) THEN Humenberg WHEN ((city#5 = G) && (city#5 = 6111)) THEN g_city WHEN ((city#5 = H) && (postcode#18 = 4544)) THEN Hfixed WHEN ((city#5 = j) && (postcode#18 = 7777)) THEN jfixed ELSE city#5 END AS city#22, postcode#18]
+- Project [city#5, CASE WHEN ((city#5 = A) && (postcode#6 = 12)) THEN 2212 WHEN ((city#5 = someREally long string name) && (postcode#6 = 2346)) THEN 2340 WHEN ((city#5 = D) && (postcode#6 = 3456)) THEN 3500 WHEN ((city#5 = F) && (postcode#6 = 1234)) THEN 2222 WHEN ((city#5 = F) && (postcode#6 = 2345)) THEN 2222 ELSE postcode#6 END AS postcode#18]
+- Filter NOT (city#5 = somethingTofilter)
+- Project [_1#2 AS city#5, _2#3 AS postcode#6]
+- LocalRelation [_1#2, _2#3]

== Analyzed Logical Plan ==
city: string, postcode: string
GlobalLimit 10001
+- LocalLimit 10001
+- Project [CASE WHEN (city#5 = Ma. Lanzendorf) THEN Maria Lanzendorf WHEN (city#5 = 123) THEN Munich WHEN (city#5 = ?nnsbruck) THEN Innsbruck WHEN (city#5 = Deutschwagram) THEN Deutsch-Wagram WHEN (city#5 = Kirchschlag i.d. Buckl.Welt) THEN Kirchschlag in der Buckligen Welt WHEN (city#5 = Bruck/Lth) THEN Gemeinde Bruck an der Leitha WHEN (city#5 = Schwarzach/Pg) THEN Schwarzach im Pongau WHEN ((city#5 = Wien-Vereinte Nationen) && (postcode#18 = 1400)) THEN Internationales Zentrum Wien WHEN ((city#5 = C) && (postcode#18 = 3333)) THEN fooCity WHEN (city#5 = D) THEN D_city WHEN ((city#5 = E) && (postcode#18 = 5432)) THEN Humenberg WHEN ((city#5 = G) && (city#5 = 6111)) THEN g_city WHEN ((city#5 = H) && (postcode#18 = 4544)) THEN Hfixed WHEN ((city#5 = j) && (postcode#18 = 7777)) THEN jfixed ELSE city#5 END AS city#22, postcode#18]
+- Project [city#5, CASE WHEN ((city#5 = A) && (postcode#6 = 12)) THEN 2212 WHEN ((city#5 = someREally long string name) && (postcode#6 = 2346)) THEN 2340 WHEN ((city#5 = D) && (postcode#6 = 3456)) THEN 3500 WHEN ((city#5 = F) && (postcode#6 = 1234)) THEN 2222 WHEN ((city#5 = F) && (postcode#6 = 2345)) THEN 2222 ELSE postcode#6 END AS postcode#18]
+- Filter NOT (city#5 = somethingTofilter)
+- Project [_1#2 AS city#5, _2#3 AS postcode#6]
+- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
GlobalLimit 10001
+- LocalLimit 10001
+- Project [CASE WHEN (_1#2 = Ma. Lanzendorf) THEN Maria Lanzendorf WHEN (_1#2 = 123) THEN Munich WHEN (_1#2 = ?nnsbruck) THEN Innsbruck WHEN (_1#2 = Deutschwagram) THEN Deutsch-Wagram WHEN (_1#2 = Kirchschlag i.d. Buckl.Welt) THEN Kirchschlag in der Buckligen Welt WHEN (_1#2 = Bruck/Lth) THEN Gemeinde Bruck an der Leitha WHEN (_1#2 = Schwarzach/Pg) THEN Schwarzach im Pongau WHEN ((_1#2 = Wien-Vereinte Nationen) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 1400)) THEN Internationales Zentrum Wien WHEN ((_1#2 = C) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 3333)) THEN fooCity WHEN (_1#2 = D) THEN D_city WHEN ((_1#2 = E) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 5432)) THEN Humenberg WHEN ((_1#2 = G) && (_1#2 = 6111)) THEN g_city WHEN ((_1#2 = H) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 4544)) THEN Hfixed WHEN ((_1#2 = j) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 7777)) THEN jfixed ELSE _1#2 END AS city#22, CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END AS postcode#18]
+- Filter (isnotnull(_1#2) && NOT (_1#2 = somethingTofilter))
+- LocalRelation [_1#2, _2#3]

== Physical Plan ==
CollectLimit 10001
+- Project [CASE WHEN (_1#2 = Ma. Lanzendorf) THEN Maria Lanzendorf WHEN (_1#2 = 123) THEN Munich WHEN (_1#2 = ?nnsbruck) THEN Innsbruck WHEN (_1#2 = Deutschwagram) THEN Deutsch-Wagram WHEN (_1#2 = Kirchschlag i.d. Buckl.Welt) THEN Kirchschlag in der Buckligen Welt WHEN (_1#2 = Bruck/Lth) THEN Gemeinde Bruck an der Leitha WHEN (_1#2 = Schwarzach/Pg) THEN Schwarzach im Pongau WHEN ((_1#2 = Wien-Vereinte Nationen) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 1400)) THEN Internationales Zentrum Wien WHEN ((_1#2 = C) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 3333)) THEN fooCity WHEN (_1#2 = D) THEN D_city WHEN ((_1#2 = E) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 5432)) THEN Humenberg WHEN ((_1#2 = G) && (_1#2 = 6111)) THEN g_city WHEN ((_1#2 = H) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 4544)) THEN Hfixed WHEN ((_1#2 = j) && (CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END = 7777)) THEN jfixed ELSE _1#2 END AS city#22, CASE WHEN ((_1#2 = A) && (_2#3 = 12)) THEN 2212 WHEN ((_1#2 = someREally long string name) && (_2#3 = 2346)) THEN 2340 WHEN ((_1#2 = D) && (_2#3 = 3456)) THEN 3500 WHEN ((_1#2 = F) && (_2#3 = 1234)) THEN 2222 WHEN ((_1#2 = F) && (_2#3 = 2345)) THEN 2222 ELSE _2#3 END AS postcode#18]
+- *Filter (isnotnull(_1#2) && NOT (_1#2 = somethingTofilter))
+- LocalTableScan [_1#2, _2#3]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment