Skip to content

Instantly share code, notes, and snippets.

@invkrh
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save invkrh/0147467f0b185462048c to your computer and use it in GitHub Desktop.
Save invkrh/0147467f0b185462048c to your computer and use it in GitHub Desktop.
groupBy problem
import context.RecoSysContext._
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
object GroupByTest extends App {
case class purchaseLog(client_id: String, ticket_id: String, store: String)
val input = sc.textFile("/home/spark/workspace/data/pruchaseLog") // The input directory is generated by RDD.saveAsTextFile with 27 partitions
.map(_.split(";", -1))
.map { case Array(cltId, tckId, store) => purchaseLog(cltId, tckId, store)}
val rdd = input.map(t => (t.ticket_id + t.client_id, t))
val grouped = rdd.groupByKey(new HashPartitioner(8)).persist // repartition into 8 partitions
val keys = grouped.map(_._1)
val distinctKeys = keys.distinct
val keysNB = keys.count
val distinctKeysNB = distinctKeys.count
val outliers = keys.map(x => (x, 1)).reduceByKey(_ + _).filter(_._2 != 1).collect.toList
// If the groupBy works well, {keysNB} = {distinctKeysNB}, then outliers is Nil. However, according to test, it is not true
println("keysNB = " + keysNB)
println("distinctKeysNB = " + distinctKeysNB)
println
println(outliers)
}
/*
Sample data:
3801959;11775022;118
3801960;14543202;118
3801984;11781380;20
3801984;13255417;20
3802003;11777557;91
3802055;11781159;26
3802076;11782793;102
3802086;17881551;102
3802087;19064728;99
3802105;12760994;99
...
There are 27 partitions(small files). Total size is about 100 Mb.
The output of this prog is changed on each execution
----------------------------------------------------------------------
keysNB = 5268004
distinctKeysNB = 5268001
List((10438712894917,2), (195223741695449,2), (164936383954840,2))
----------------------------------------------------------------------
keysNB = 5268005
distinctKeysNB = 5268001
List((196559144594377,2), (104196582136634,2), (195223741695449,2), (147915951673851,2))
-----------------------------------------------------------------------------------------
keysNB = 5268006
distinctKeysNB = 5268001
List((104216263298928,2), (195223741695449,2), (147915951673851,2), (148042573468742,2), (119668991586437,2))
--------------------------------------------------------------------------------------------------------------
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment