Skip to content

Instantly share code, notes, and snippets.

@bekce
Last active June 7, 2017 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bekce/e14afdc30814e9d4712d0df6ac967cf0 to your computer and use it in GitHub Desktop.
Save bekce/e14afdc30814e9d4712d0df6ac967cf0 to your computer and use it in GitHub Desktop.
Spark partitioning test with multiple RDDs
import java.lang.management.ManagementFactory
import java.net.InetAddress
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkContext}
import scala.runtime.ScalaRunTime
/**
* Note: Package as a jar and run with spark-submit against a running cluster.
* Created by bekce on 6/5/17.
*/
object PartitioningTest {
class OneDimPartitioner(size: Int, perPart: Int) extends Partitioner {
private val parts = math.ceil(size * 1.0 / perPart).toInt
override def numPartitions: Int = parts
override def getPartition(key: Any): Int = key.asInstanceOf[Int] / perPart
}
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
println(s"CONF=${ScalaRunTime.stringOf(sc.getConf.getAll)}, sc.defaultParallelism=${sc.defaultParallelism}")
val partitioner = new OneDimPartitioner(120, 6)
val A = sc.parallelize(0 to 119).map(t => (t, "A"+t)).partitionBy(partitioner).cache()
printAll(A, "A")
val B = sc.parallelize(0 to 119).map(t => (t, "B"+t)).partitionBy(partitioner).cache()
printAll(B, "B")
val C: RDD[(Int, (String, String))] = A.join(B, partitioner)
printAll(C, "C")
sc.stop()
}
def printAll(rdd: RDD[_ <: AnyRef], msg: String) : Unit = {
rdd.foreach(t => {
println(s"$msg, $t, jvm=${ManagementFactory.getRuntimeMXBean().getName()}, localhost=${InetAddress.getLocalHost()}")
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment