Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A simple program used to test Partitioner in Apache Spark 1.4.1
package org.apache.spark.test
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/**
* Partitioner Test
*/
object PartitionerTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
val sc = new SparkContext(conf)
/* Original RDD Partitions */
val rdd = sc.parallelize(Array(1, 3, 6, 5, 4, 2), 2)
println(rdd.mapPartitionsWithIndex((index, iter) => {
var list = List[(Int, Int)]()
while (iter.hasNext) {
list = list ::: List((index, iter.next))
}
list.iterator
}).collect.mkString(", "))
val rddWithKey = rdd.keyBy(x => x)
/* T: HashPartitioner */
println(rddWithKey.groupByKey(new HashPartitioner(2)).mapPartitionsWithIndex((index, iter) => {
var list = List[(Int, Int)]()
while (iter.hasNext) {
val subIter = iter.next._2.iterator
while (subIter.hasNext) {
val cur = subIter.next
list = list ::: List((index, cur))
}
}
list.iterator
}).collect.mkString(", "))
/* T: RangePartitioner */
println(rddWithKey.groupByKey(new RangePartitioner(2, rddWithKey)).mapPartitionsWithIndex((index, iter) => {
var list = List[(Int, Int)]()
while (iter.hasNext) {
val subIter = iter.next._2.iterator
while (subIter.hasNext) {
val cur = subIter.next
list = list ::: List((index, cur))
}
}
list.iterator
}).collect.mkString(", "))
}/*
Output:
(0,1), (0,3), (0,6), (1,5), (1,4), (1,2)
(0,4), (0,6), (0,2), (1,1), (1,3), (1,5)
(0,1), (0,3), (0,2), (1,4), (1,6), (1,5)
*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.