Skip to content

Instantly share code, notes, and snippets.

@sidnt
Created September 11, 2018 11:54
Show Gist options
  • Save sidnt/a0d9281c7586d5340b17ce9fee67c202 to your computer and use it in GitHub Desktop.
Save sidnt/a0d9281c7586d5340b17ce9fee67c202 to your computer and use it in GitHub Desktop.
/* in spark-shell */
import org.apache.spark.rdd.RDD
/* Q1 helpers */
type Input = (Int,Double)
type Output = Input
def f(r1: RDD[Input], r2: RDD[Input]):RDD[Output] = {
val cr:RDD[(Int,(Double,Double))] = r1 join r2
cr.map(re => (re._1, re._2._1 - re._2._2))
}
/* Q2 helpers */
def g1(in: RDD[(Int,Array[(Int,Int)])]):RDD[(Int,(Int,Int))] = {
in.flatMap(re => re._2.map(ae => (re._1,ae)) )
}
def g2(in: RDD[(Int,Array[(Int,Int)])]):RDD[(Int,(Int,Int))] = {
for {
re <- in
ae <- re._2
} yield (re._1, ae)
}
/* Q3 helpers */
def transform(a:(Int, List[Int])):String = a._1.toString + "--" + a._2.filter(_ == a._1)(0).toString
def h(in:RDD[(Int, List[Int])]):Unit = (in map transform).collect foreach println
/* solutions Q1*/
val r1 = sc.parallelize(Seq((1,3.6)))
val r2 = sc.parallelize(Seq((1,1.1)))
f(r1,r2).collect foreach println
//(1,2.5)
/* solutions Q2 */
val rdd1 = sc.parallelize(Array((1,Array((3,4),(4,5))),(2,Array((4,2),(4,4),(3,9)))))
g1(rdd1).collect foreach println
// (1,(3,4))
// (1,(4,5))
// (2,(4,2))
// (2,(4,4))
// (2,(3,9))
g2(rdd1).collect foreach println
// (1,(3,4))
// (1,(4,5))
// (2,(4,2))
// (2,(4,4))
// (2,(3,9))
/* solutions Q3 */
val rdd2 = sc.parallelize(Array((1,List(1,2,3,4)),(2,List(1,2,3,4)),(3,List(1,2,3,4)),(4,List(1,2,3,4))))
(rdd2 map transform).collect foreach println
// 1--1
// 2--2
// 3--3
// 4--4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment