Created
May 19, 2016 20:54
-
-
Save krishnanraman/513cde559b8d963f5bf8b745490af5a8 to your computer and use it in GitHub Desktop.
PS: Use partitionBy(col* ) api in DataFrameWriter, NOT the code below ( which works but is so.... 2015 ).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.reflect.ClassTag | |
import org.apache.spark.rdd.RDD | |
object Splitter { | |
def split[T:ClassTag, U:ClassTag](rdd:RDD[T], f:T=>U) = { | |
val splits = rdd.groupBy{ x:T => f(x) } | |
val keys:Seq[U] = splits.keys.collect().toSeq | |
keys.foreach{ key:U => | |
splits | |
.filter{ x => x._1 == key } // get records that match this key | |
.repartition(1) // put all records in 1 partition | |
.map{ x => x._2 } // don't need key anymore, only records | |
.saveAsTextFile(key.toString) // save to disk (instead of saveAsText, use Avro or whatever else) | |
} | |
} | |
} | |
case class C(clientID:Long, name:String) extends Serializable // mock | |
// needs to be a top-level object on account of serialization issues if inner! | |
object SplitBy { | |
def splitBy(c:C):Long = { | |
c.clientID | |
} | |
} | |
object SplitTest extends App { | |
val data:Seq[C] = (1 to 100).toSeq.map{ i => C(i % 10, "record" + i) } // create data for 10 clients with keys 0 to 9 | |
val rdd:RDD[C] = sc.makeRDD(data) // rdd of criterion fact | |
Splitter.split(rdd, SplitBy.splitBy _) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment