Skip to content

Instantly share code, notes, and snippets.

@yuta-imai
Created April 10, 2017 07:37
Show Gist options
  • Save yuta-imai/1412a7dd38a1a2d5805cf31ebffd53db to your computer and use it in GitHub Desktop.
Save yuta-imai/1412a7dd38a1a2d5805cf31ebffd53db to your computer and use it in GitHub Desktop.
package imaifactory.spark
import com.google.gson.{Gson, JsonElement, JsonObject}
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
object YarnProblemRepro extends App {
val conf = new SparkConf().setAppName("yarnIssueTester")
conf.set("spark.serializer", classOf[KryoSerializer].getName)
var jsonStringList = List(
"""
[
{"name": "Alice", "score": 2 },
{"name": "Alice", "score": 1 },
{"name": "Bob", "score": 2 }
]
""".stripMargin,
"""
[
{"name": "Alice", "score": 2 },
{"name": "Alice", "score": 1 },
{"name": "Bob", "score": 2 }
]
""".stripMargin)
case class Record ( name: String, B: Int )
val spark = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val rddOfString = sc.parallelize(jsonStringList)
val rddOfRecords = rddOfString.flatMap(jsonString => {
val gson = new Gson
val jsonArray = gson.fromJson(jsonString,classOf[Array[JsonObject]])
jsonArray.map(record => {
val name = record.get("name") match {
case x:JsonElement => x.getAsString
case null => "-"
}
val score = record.get("score") match {
case x:JsonElement => x.getAsInt
case null => 0
}
Record(name, score)
}).toIterator
})
val df = rddOfRecords.toDF
df.groupBy("name").count.show
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment