Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Created May 19, 2016 20:54
Show Gist options
  • Save krishnanraman/513cde559b8d963f5bf8b745490af5a8 to your computer and use it in GitHub Desktop.
Save krishnanraman/513cde559b8d963f5bf8b745490af5a8 to your computer and use it in GitHub Desktop.
PS: Use partitionBy(col* ) api in DataFrameWriter, NOT the code below ( which works but is so.... 2015 ).
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
object Splitter {
def split[T:ClassTag, U:ClassTag](rdd:RDD[T], f:T=>U) = {
val splits = rdd.groupBy{ x:T => f(x) }
val keys:Seq[U] = splits.keys.collect().toSeq
keys.foreach{ key:U =>
splits
.filter{ x => x._1 == key } // get records that match this key
.repartition(1) // put all records in 1 partition
.map{ x => x._2 } // don't need key anymore, only records
.saveAsTextFile(key.toString) // save to disk (instead of saveAsText, use Avro or whatever else)
}
}
}
case class C(clientID:Long, name:String) extends Serializable // mock
// needs to be a top-level object on account of serialization issues if inner!
object SplitBy {
def splitBy(c:C):Long = {
c.clientID
}
}
object SplitTest extends App {
val data:Seq[C] = (1 to 100).toSeq.map{ i => C(i % 10, "record" + i) } // create data for 10 clients with keys 0 to 9
val rdd:RDD[C] = sc.makeRDD(data) // rdd of criterion fact
Splitter.split(rdd, SplitBy.splitBy _)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment