Skip to content

Instantly share code, notes, and snippets.

@schon
Last active November 7, 2015 14:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save schon/b49a616eeb0b78fb5f47 to your computer and use it in GitHub Desktop.
Save schon/b49a616eeb0b78fb5f47 to your computer and use it in GitHub Desktop.
Workaround a equality bug of data frame in spark 1.5.1
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}
object SparkDFTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkDFTest")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val eventTableColumns = Seq[String](
"entityType"
, "entityId"
, "targetEntityType"
, "targetEntityId"
, "properties"
, "eventTime")
// insert event
/**
* input csv format without header
*
* 1223140803222504478701,361804026,buy,1,2015-11-01T05:40:20+09:00,ib_user
* 1223140803222504478701,361804026,buy,1,2015-11-01T05:40:20+09:00,ib_user
* 2611150613134148167296,431634010,buy,1,2015-11-01T16:13:18+09:00,user
* 2611151029200004176515,349617013,buy,1,2015-11-01T16:51:09+09:00,user
*/
val eventDF = sc.textFile("events_s.csv").map(_.split(",")).filter(_.size >= 6)
.map { e =>
(
e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
)
}.toDF(eventTableColumns:_*).cache()
// print schema
eventDF.printSchema()
/**
* root
* |-- entityType: string (nullable = true)
* |-- entityId: string (nullable = true)
* |-- targetEntityType: string (nullable = true)
* |-- targetEntityId: string (nullable = true)
* |-- properties: string (nullable = true)
* |-- eventTime: string (nullable = true)
*/
// SELECT entityType, count(distinct entityId) as count FROM dataframe GROUP BY entityType
eventDF.groupBy("entityType").agg(countDistinct("entityId").alias("count")).show
/**
* +----------+-----+
* |entityType|count|
* +----------+-----+
* | ib_user| 4751|
* | user| 2091|
* +----------+-----+
*/
/**
* eventDF.filter($"entityType" === "user") does NOT work
* comparison '===' has bug in spark 1.5.1.
* It would be fixed on next versions.
*
* @see [[https://issues.apache.org/jira/browse/SPARK-10859]]
*/
eventDF.filter($"entityType" === "user").select("entityId").distinct.count
/**
* Wrong Result
*
* scala> eventDF.filter($"entityType" === "user").select("entityId").distinct.count
* res56: Long = 1219
*/
/**
* two workarounds this bug.
*
* First is to use `isin` function.
*/
eventDF.filter($"entityType" isin lit("user")).select("entityId").distinct.count
/**
* scala> eventDF.filter($"entityType" isin lit("user")).select("entityId").distinct.count
* res57: Long = 2091
*/
/**
* Second is to use case class instead of toDF("col1", "col2", ...)
*/
val eventDF2 = sc.textFile("events_s.csv").map(_.split(",")).filter(_.size >= 6)
.map { e =>
Event(
e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
)
}.toDF()
eventDF2.filter($"entityType" === "user").select("entityId").distinct.count
/**
* scala> eventDF2.filter($"entityType" === "user").select("entityId").distinct.count
* res58: Long = 2091
*/
/**
* Third is to turn off "spark.sql.inMemoryColumnarStorage.partitionPruning"
*/
sqlContext.sql("SET spark.sql.inMemoryColumnarStorage.partitionPruning=false")
eventDF.filter($"entityType" === "user").select("entityId").distinct.count
/**
* scala> sqlContext.sql("SET spark.sql.inMemoryColumnarStorage.partitionPruning=false")
* res59: org.apache.spark.sql.DataFrame = [key: string, value: string]
*
* scala> eventDF.filter($"entityType" === "user").select("entityId").distinct.count
* res60: Long = 2091
*/
sc.stop()
}
case class Event(entityType: String, entityId: String,
targetEntityType: String, targetEntityId: String,
properties: String, eventTime: String)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment