Skip to content

Instantly share code, notes, and snippets.

@TomLous
Last active April 25, 2017 09:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TomLous/bbae7c44512a6279ed3126303a9d8776 to your computer and use it in GitHub Desktop.
Save TomLous/bbae7c44512a6279ed3126303a9d8776 to your computer and use it in GitHub Desktop.
import spark.sqlContext.implicits._
case class GroupedKvKRecord(groupId: Long, kvkRecords: Seq[KvKRecord])
// need to broadcast for rdd in rdd mapping
val bcVertices = spark.sparkContext.broadcast(vertices.collectAsMap)
// connectedComponents is the magic here
val groupedKvKRecords = linkedGraph
.connectedComponents
.vertices
.map {
case (id, smallestId) => (smallestId, id)
}
.groupByKey
.mapValues(
_.map(bcVertices.value(_)).toList // calling vertices directly in other rdd mapping will result in nullpointerexception
)
.toDF("groupId", "kvkRecords")
.as[GroupedKvKRecord]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment