Skip to content

Instantly share code, notes, and snippets.

@ceteri
Last active May 14, 2020
Embed
What would you like to do?
Intro to Apache Spark: code example for (K,V), join, operator graph
2014-03-04 15dfb8e6cc4111e3a5bb600308919594 11
2014-03-06 81da510acc4111e387f3600308919594 61
2014-03-02 15dfb8e6cc4111e3a5bb600308919594 1 33.6599436237 -117.958125229
2014-03-04 81da510acc4111e387f3600308919594 2 33.8570099635 -117.855744398
bash-3.2$ ./bin/spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.0.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
2014-04-28 12:36:48.473 java[11284:1003] Unable to load realm info from SCDynamicStore
Spark context available as sc.
scala> val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
format: java.text.SimpleDateFormat = java.text.SimpleDateFormat@f67a0200
scala> case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float, lng: Float)
defined class Register
scala> case class Click (d: java.util.Date, uuid: String, landing_page: Int)
defined class Click
scala> val reg = sc.textFile("reg.tsv").map(_.split("\t")).map(
| r => (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat))
| )
reg: org.apache.spark.rdd.RDD[(String, Register)] = MappedRDD[3] at map at <console>:21
scala> val clk = sc.textFile("clk.tsv").map(_.split("\t")).map(
| c => (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt))
| )
clk: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[7] at map at <console>:21
scala> reg.join(clk).take(2)
14/04/28 12:37:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/04/28 12:37:48 WARN LoadSnappy: Snappy native library not loaded
res0: Array[(String, (Register, Click))] = Array((81da510acc4111e387f3600308919594,(Register(Tue Mar 04 00:00:00 PST 2014,81da510acc4111e387f3600308919594,2,33.85701,-117.85574),Click(Thu Mar 06 00:00:00 PST 2014,81da510acc4111e387f3600308919594,61))), (15dfb8e6cc4111e3a5bb600308919594,(Register(Sun Mar 02 00:00:00 PST 2014,15dfb8e6cc4111e3a5bb600308919594,1,33.659943,-117.95812),Click(Tue Mar 04 00:00:00 PST 2014,15dfb8e6cc4111e3a5bb600308919594,11))))
scala> reg.join(clk).toDebugString
res5: String =
FlatMappedValuesRDD[46] at join at <console>:23 (1 partitions)
MappedValuesRDD[45] at join at <console>:23 (1 partitions)
CoGroupedRDD[44] at join at <console>:23 (1 partitions)
MappedRDD[36] at map at <console>:16 (1 partitions)
MappedRDD[35] at map at <console>:16 (1 partitions)
MappedRDD[34] at textFile at <console>:16 (1 partitions)
HadoopRDD[33] at textFile at <console>:16 (1 partitions)
MappedRDD[40] at map at <console>:16 (1 partitions)
MappedRDD[39] at map at <console>:16 (1 partitions)
MappedRDD[38] at textFile at <console>:16 (1 partitions)
HadoopRDD[37] at textFile at <console>:16 (1 partitions)
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float, lng: Float)
case class Click (d: java.util.Date, uuid: String, landing_page: Int)
val reg = sc.textFile("reg.tsv").map(_.split("\t")).map(
r => (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat))
)
val clk = sc.textFile("clk.tsv").map(_.split("\t")).map(
c => (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt))
)
reg.join(clk).take(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment