Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created December 28, 2017 22:39
Show Gist options
  • Save ahoy-jon/e0191871315246192e6ea03709b25e69 to your computer and use it in GitHub Desktop.
Save ahoy-jon/e0191871315246192e6ea03709b25e69 to your computer and use it in GitHub Desktop.
Spark Joins by forcing the partionner on materialized data
package stategolf
import java.util.UUID
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.immutable
import scala.reflect.ClassTag
import scala.util.Random
case class A(id:String,a:String)
case class B(id:String,b:String)
case class AB(id:String,as:Seq[A], bs:Seq[B])
object GenerateTestData {
val nbPartition = 1197
def main(args: Array[String]): Unit = {
//GENERATE A and B
//Save A and B to disk, shuffled
//Save A and B to disk, sorted
//Save A and B to disk, hashed
//Save A and B to disk, grouped sorted
//Save A and B to disk, grouped hashed
//Save A and B to disk, sorted different partition number
//Cogroup A and B with the RDD API (6 tests)
//Try force partitioner (5 tests)
//Cogroup A and B with the Dataset API (2 tests, sorted, hashed)
val ss: SparkSession = SparkSession.builder().master("local[1]")
.config("spark.sql.files.openCostInBytes",1000000000L)
.config("spark.network.timeout", 10000000)
.config("spark.executor.heartbeatInterval",2000000).getOrCreate()
import ss.implicits._
val dsRaw = ss.sparkContext.makeRDD(1 to 1000000).repartition(nbPartition).map(i => {
val id = UUID.randomUUID().toString
val nbA = Random.nextInt(40)
val nbB = Random.nextInt(20)
AB(id = id,
as = for (j <- 1 to nbA) yield A(id, Random.alphanumeric.take(8).mkString),
bs = for (j <- 1 to nbB) yield B(id, Random.alphanumeric.take(6).mkString))
}).toDS.repartition(nbPartition).write.mode(SaveMode.Ignore).parquet("AB.testdata")
val ds = ss.read.parquet("AB.testdata").as[AB]
import org.apache.spark.sql.functions._
val rddA = ds.select(expr("explode(as) as r")).select(expr("r.id"),expr("r.a")).as[A]
val rddB = ds.select(expr("explode(bs) as r")).select(expr("r.id"), expr("r.b")).as[B]
implicit class RDDOps[T:Encoder](ds:RDD[T]) {
def save(path:String):Unit = {
ds.toDS().write.mode(SaveMode.Overwrite).parquet(path)
}
}
rddA.rdd.keyBy(_ => Random.nextInt()).sortByKey(numPartitions = nbPartition).map(_._2).save("A.shuffled")
rddB.rdd.keyBy(_ => Random.nextInt()).sortByKey(numPartitions = nbPartition).map(_._2).save("B.shuffled")
rddA.rdd.keyBy(_.id).sortByKey(numPartitions = nbPartition).map(_._2).save("A.sorted")
rddB.rdd.keyBy(_.id).sortByKey(numPartitions = nbPartition).map(_._2).save("B.sorted")
rddA.rdd.repartition(nbPartition)(Ordering.by(_.id)).save("A.hashed")
rddB.rdd.repartition(nbPartition)(Ordering.by(_.id)).save("B.hashed")
//rddA.rdd.groupBy( (x:A) => x.id,numPartitions = nbPartition).map({case (x,s) => (x,s.toSeq)}).save("A.grouped.hashed")
//rddB.rdd.groupBy( (x:B) => x.id,numPartitions = nbPartition).map({case (x,s) => (x,s.toSeq)}).save("B.grouped.hashed")
//rddA.rdd.groupBy( (x:A) => x.id,new RangePartitioner(nbPartition,rddA.rdd.keyBy(_.id))).map({case (x,s) => (x,s.toSeq)}).toDS().write.parquet("A.grouped.sorted")
//rddB.rdd.groupBy( (x:B) => x.id,new RangePartitioner(nbPartition,rddB.rdd.keyBy(_.id))).map({case (x,s) => (x,s.toSeq)}).toDS().write.parquet("B.grouped.sorted")
println("finished")
Thread.sleep(100000)
}
}
object TestJoin {
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder().master("local[1]").config("spark.executor.heartbeatInterval",1000000L).getOrCreate()
import ss.implicits._
println("finished")
Thread.sleep(100000)
}
}
class WrapRDD[T: ClassTag](rdd: RDD[T], part: Partitioner) extends RDD[T](rdd.sparkContext, rdd.dependencies) {
require(rdd.partitions.size == part.numPartitions)
@DeveloperApi
override def compute(split: Partition,
context: TaskContext): Iterator[T] = rdd.compute(split, context)
override protected def getPartitions: Array[Partition] = rdd.partitions
override val partitioner = Some(part)
}
object TestForceJoin {
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder().master("local[1]")
.config("spark.sql.files.openCostInBytes",1000000000L)
.config("spark.network.timeout", 10000000)
.config("spark.executor.heartbeatInterval",2000000)
.getOrCreate()
import ss.implicits._
val partitioner = new HashPartitioner(GenerateTestData.nbPartition)
val rddA = new WrapRDD(ss.read.parquet("A.hashed").as[A].rdd.keyBy(_.id),partitioner)
val rddB = new WrapRDD(ss.read.parquet("B.hashed").as[B].rdd.keyBy(_.id),partitioner)
//4 minutes
rddA.cogroup(rddB).map({case (k,(a,b)) => AB(k,a.toSeq,b.toSeq)}).toDS().write.mode(SaveMode.Overwrite).parquet("forcedJoin")
//6 minutes
ss.read.parquet("A.sorted").as[A].groupByKey(_.id).cogroup(
ss.read.parquet("B.sorted").as[B].groupByKey(_.id))({case (k,a,b) => Seq(AB(k,a.toSeq,b.toSeq))}).write.mode(SaveMode.Overwrite).parquet("AB.OUTSORTED")
//8 minutes
ss.read.parquet("A.hashed").as[A].rdd.keyBy(_.id).cogroup(ss.read.parquet("B.hashed").as[B].rdd.keyBy(_.id)).map({case (k,(a,b)) => AB(k,a.toSeq,b.toSeq)}).toDS.write.mode(SaveMode.Overwrite).parquet("AB.HASHED")
Thread.sleep(3000000)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment