Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Created May 18, 2016 01:23
Show Gist options
  • Save krishnanraman/ff94f2c526ee11c5024c8c2121ffb8bb to your computer and use it in GitHub Desktop.
Save krishnanraman/ff94f2c526ee11c5024c8c2121ffb8bb to your computer and use it in GitHub Desktop.
$ ls -l
0
1
2
3
4
5
6
7
8
9
kraman@dell:~/workspace$ cd 0
kraman@dell:~/workspace/0$ ls
part-00000 _SUCCESS
kraman@dell:~/workspace/0$ more part-00000
CompactBuffer(C(0,record10), C(0,record20), C(0,record30), C(0,record40), C(0,record50), C(0,record60), C(0,rec
ord70), C(0,record80), C(0,record90), C(0,record100))
kraman@dell:~/workspace/0$ cd ../1
kraman@dell:~/workspace/1$ more part-00000
CompactBuffer(C(1,record1), C(1,record11), C(1,record21), C(1,record31), C(1,record41), C(1,record51), C(1,reco
rd61), C(1,record71), C(1,record81), C(1,record91))
$spark-shell
import org.apache.spark.rdd.RDD
case class C(clientID:Long, name:String) extends Serializable // mock for CriterionRecord
val data:Seq[C] = (1 to 100).toSeq.map{ i => C(i % 10, "record" + i) } // create data for 10 clients with keys 0 to 9
val rdd:RDD[C] = sc.makeRDD(data) // rdd of criterion fact
val splits = rdd.groupBy{ x=> x.clientID } // group by client id
val keys:Seq[Long] = splits.keys.collect().toSeq // list of client ids
keys.foreach{ key:Long =>
splits
.filter{ x => x._1 == key } // get records that match this client id
.repartition(1) // put all records in 1 partition
.map{ x => x._2 } // don't need client id anymore, only records
.saveAsTextFile(key.toString) // save to disk (instead of saveAsText, use Avro or whatever else)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment