Skip to content

Instantly share code, notes, and snippets.

@umbertogriffo
Last active January 29, 2020 12:57
Show Gist options
  • Save umbertogriffo/b55b706213e866ce4ef6a92ad7b36ce4 to your computer and use it in GitHub Desktop.
Save umbertogriffo/b55b706213e866ce4ef6a92ad7b36ce4 to your computer and use it in GitHub Desktop.
This is a collections of examples about Apache Spark's RDD Api. These examples aim to help me test the RDD functionality.
/*
This is a collections of examples about Apache Spark's RDD Api. These examples aim to help me test the RDD functionality.
References:
http://spark.apache.org/docs/latest/programming-guide.html
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
*/
object RddAPI {
var sc : SparkContext = _
val mapApi = Map(
1->"aggregate: two different reduce functions to the RDD",
2->"aggregateByKey [Pair]: aggregation is applied to the values with the same key",
3->"combineByKey [Pair]: combines the values of a RDD consisting of two-component tuples by applying multiple aggregators one after another",
4->"countByKey [Pair]: counts the values of a RDD consisting of two-component tuples for each distinct key separately",
5->"countByValue: frequency distribution",
6->"filterByRange [Ordered]: data in key value pairs and already been sorted by key",
7->"foldByKey",
8->"forEachPartition",
9->"glom: assembles an array that contains all elements of the partition and embeds it in an RDD",
10->"groupBy",
11->"groupByKey [Pair]",
12->"histogram [Double]",
13->"lookup",
14->"mapPartitions",
15->"reduceByKey [Pair]",
16->"Sample functions: sampleByKey [Pair] - takeSample",
17->"Sort: sort, sortByKey [Ordered]",
18->"subtractByKey [Pair]",
19-> "Tree functions: treeAggregate - treeReduce"
)
def main(args: Array[String]): Unit =
{
System.setProperty("hadoop.home.dir", "c:/winutil/")
var exit = 1
println("SPARK RDD API")
val theSparkConf: SparkConf = new SparkConf().setAppName("Spark RDD Api Example")
.setMaster("local[1]")
sc = new SparkContext(theSparkConf)
val log: Logger = Logger.getRootLogger
log.setLevel(Level.OFF)
while (exit != 0) {
val apiChoice = getApiChoice
println(mapApi(apiChoice))
apiChoice match {
case 1 => runAggregateFunction
case 2 => runAggregateByKey
case 3 => runCombineByKey
case 4 => runCountByKey
case 5 => runCountByValue
case 6 => runFilterByRange
case 7 => runFoldByKey
case 8 => runForEachPartition
case 9 => runGlom
case 10 => runGroupBy
case 11 => runGroupByKey
case 12 => runHistogram
case 13 => runLookUp
case 14 => runMapPartitions
case 15 => runReduceByKey
case 16 => runSampleFunctions
case 17 => runSortingFunctions
case 18 => runSubtractByKey
case 19 => runTreeFunctions
}
endFunction
println("Enter 0 to quit. Any other number to run other functions")
exit = readInt()
}
sc.stop()
}
def getApiChoice: Int =
{
println("Select an API")
mapApi.toSeq.sortBy(p => p._1).foreach{case(k, v) => println(k + " -> " + v)}
val choice = readInt()
choice
}
def runAggregateFunction: Unit =
{
println("***********************************************************************")
println("def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U\n" +
"The aggregate function allows the user to apply two different reduce functions to the RDD.\n" +
"The first reduce function is applied within each partition to reduce the data within each partition into a single result.\n" +
"The second reduce function is used to combine the different reduced results of all partitions together to arrive at one final result.\n" +
"The ability to have two separate reduce functions for intra partition versus across partition reducing adds a lot of flexibility.\n" +
"For example the first reduce function can be the max function and the second one can be the sum function.\n" +
"The user also specifies an initial value.\n" +
"The initial value is applied at both levels of reduce. So both at the intra partition reduction and across partition reduction.\n" +
"Both reduce functions have to be commutative and associative.\n" +
"Do not assume any execution order for either partition computations or combining partitions.")
val t1 = sc.parallelize(List(1,2,3,4,5,6), 2)
println("sc.parallelize(List(1,2,3,4,5,6), 2) created")
// lets first print out the contents of the RDD with partition labels
def myfunc1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val p = t1.mapPartitionsWithIndex(myfunc1).collect
println("z.mapPartitionsWithIndex(myfunc) for partition data visualization")
p.foreach(println)
var r1 : Int = t1.aggregate(0)(math.max(_, _), _ + _)
println("z.aggregate(0)(math.max(_, _), _ + _) executed")
println("res:" + r1)
println("z.aggregate(5)(math.max(_, _), _ + _) executed")
r1 = t1.aggregate(5)(math.max(_, _), _ + _)
println("res:" + r1)
println("This example returns 16 since the initial value is 5\n" +
"Reduce of partition 0 will be max(5, 1, 2, 3) = 5\n" +
"Reduce of partition 1 will be max(5, 4, 5, 6) = 6\n" +
"Final reduce across partitions will be 5 + 5 + 6 = 16\n" +
"Note the final reduce include the initial value")
println("")
def myfunc2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val t2 = sc.parallelize(List("12","23","345","4567"),2)
println("sc.parallelize(List(\"12\",\"23\",\"345\",\"4567\" created")
val p2 = t2.mapPartitionsWithIndex(myfunc2).collect
println("t2.mapPartitionsWithIndex(myfunc) for partition data visualization")
p2.foreach(println)
val r3 = t2.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
println("t2.aggregate(\"\")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) executed")
println("res:" + r3)
println("This example returns 24 because\n" +
"Reduce of partition 0 will be max(\"\", len=2) = 2 as String\n" +
"Reduce of partition 1 will be max(\"\", len=4) = 4 as String \n" +
"Final reduce across partitions will be 24 as String\n" +
"Note the final reduce include the initial value (empty string)")
println("")
val r4 = t2.aggregate(" ")(
(x,y) => math.min(x.length, y.length).toString,
(x,y) => x + y
)
println("t2.aggregate(\"\")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) executed")
println("res:" + r4)
endFunction
}
def runAggregateByKey: Unit =
{
println("***********************************************************************")
println("def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]\n" +
"Works like the aggregate function except the aggregation is applied to the values with the same key.\n" +
"Also unlike the aggregate function the initial value is not applied to the second reduce.")
val pairRDD = sc.parallelize(List( ("cat", 2),
("cat", 5),
("mouse", 4),
("cat", 12),
("dog", 12),
("mouse", 2)), 2)
println("A pair RDD")
pairRDD.foreach(println)
println("")
println("Partitions data")
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val p = pairRDD.mapPartitionsWithIndex(myfunc).collect
p.foreach(println)
println("")
val r1 = pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
println("res:")
r1.foreach(println)
println("")
val r2 = pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
println("res:")
r2.foreach(println)
endFunction
}
def runCombineByKey: Unit =
{
println("***********************************************************************")
println("def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]\n" +
"Very efficient implementation that combines the values of a RDD consisting of two-component tuples\n" +
"by applying multiple aggregators one after another")
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
println("sc.parallelize(List(\"dog\",\"cat\",\"gnu\",\"salmon\",\"rabbit\",\"turkey\",\"wolf\",\"bear\",\"bee\"), 3)\n" +
"sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)" )
val c : RDD[(Int, String)] = b.zip(a)
println("b.zip(a")
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect.foreach(println)
endFunction
}
def runCountByKey: Unit =
{
println("***********************************************************************")
println("def countByKey(): Map[K, Long]")
println("Very similar to count, but counts the values of a RDD consisting of two-component tuples for each distinct key separately. ")
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
println("sc.parallelize(List((3, \"Gnu\"), (3, \"Yak\"), (5, \"Mouse\"), (3, \"Dog\")), 2)")
val res: Map[Int, Long] = c.countByKey
println("c.countByKey")
res.foreach{case (k,v) => println(k + "->" + v)}
endFunction
}
def runCountByValue: Unit =
{
println("***********************************************************************")
println("def countByValue(): Map[T, Long]")
println("Returns a map that contains all unique values of the RDD and their respective occurrence counts.\n" +
"(Warning: This operation will finally aggregate the information in a single reducer.)")
val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
val res : Map[Int,Long] = b.countByValue
println("sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))\n" +
"b.countByValue")
res.toSeq.sortBy(_._1).foreach{case (k,v) => println(k + "->" + v)}
//res27: scala.collection.Map[Int,Long] = Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1)
println("***********************************************************************")
println("")
println("")
}
def runFilterByRange: Unit =
{
println("***********************************************************************")
println("def filterByRange(lower: K, upper: K): RDD[P]")
println("Returns an RDD containing only the items in the key range specified.\n" +
"From testing, it appears this only works if your data is in key value pairs and it has already been sorted by key.")
val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)
randRDD.foreach(println)
println("")
val sortedRDD = randRDD.sortByKey()
sortedRDD.foreach(println)
println("")
println("sortedRDD.filterByRange(1, 3)")
val res : Array[(Int, String)] = sortedRDD.filterByRange(1, 3).collect
res.foreach(println)
println("***********************************************************************")
println("")
println("")
}
def runFoldByKey: Unit =
{
println("***********************************************************************")
println("def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]")
println("Very similar to fold, but performs the folding separately for each key of the RDD.\n" +
"This is only available if the RDD consists of two-component tuples")
var a : RDD[String] = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
var b : RDD[(Int, String)] = a.map(x => (x.length, x))
b.foreach(println)
println("")
println("b.foldByKey(\"\")(_ + _)")
var res = b.foldByKey("")(_ + _).collect
res.foreach(println)
println("")
a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
b = a.map(x => (x.length, x))
b.foreach(println)
println("")
println("b.foldByKey(\"\")(_ + _)")
res = b.foldByKey("")(_ + _).collect
println("")
res.foreach(println)
endFunction
}
def runForEachPartition: Unit =
{
println("***********************************************************************")
println("def foreachPartition(f: Iterator[T] => Unit)")
println("Executes an parameterless function for each partition.\n" +
"Access to the data items contained in the partition is provided via the iterator argument.")
val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val p = b.mapPartitionsWithIndex(myfunc).collect
println("z.mapPartitionsWithIndex(myfunc) for partition data visualization")
p.foreach(println)
println("Results")
b.foreachPartition(x => println(x.reduce(_ + _)))
endFunction
}
def runGlom: Unit =
{
println("***********************************************************************")
println("def glom(): RDD[Array[T]]")
println("Assembles an array that contains all elements of the partition and embeds it in an RDD.\n" +
"Each returned array contains the contents of one partition.")
val a = sc.parallelize(1 to 20, 3)
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val p = a.mapPartitionsWithIndex(myfunc).collect
println("z.mapPartitionsWithIndex(myfunc) for partition data visualization")
p.foreach(println)
val res : Array[Array[Int]] = a.glom.collect
for(p <- 0 until 3) {
print("Array [Partition" + (p+1) +"]: ")
for(e <- 0 until res(p).length)
print(res(p)(e) + " \t")
println("")
}
endFunction
}
def runGroupBy: Unit =
{
println("***********************************************************************")
println("def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]")
println("Return an RDD of grouped items by a boolean function. Each group consists of a key and a sequence of elements mapping to that key.\n" +
"This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key,\n" +
"using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance")
var a = sc.parallelize(1 to 9, 3)
var res : RDD[(String, Iterable[Int])] = a.groupBy(x => { if (x % 2 == 0) "even" else "odd" })
println("sc.parallelize(1 to 9, 3)\na.groupBy(x => { if (x % 2 == 0) \"even\" else \"odd\" })")
val t : Array[(String, Iterable[Int])] = res.collect
t.foreach(println)
//res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))
a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int = a % 2
println("myfunc(a: Int) : Int = a % 2\na.groupBy(myfunc)")
val b : RDD[(Int, Iterable[Int])] = a.groupBy(myfunc)
val k : Array[(Int, Iterable[Int])] = b.collect
k.foreach(println)
endFunction
}
def runGroupByKey: Unit =
{
println("***********************************************************************")
println("def groupByKey(): RDD[(K, Iterable[V])]")
println("Very similar to groupBy, but instead of supplying a function,\n" +
"the key-component of each pair will automatically be presented to the partitioner.")
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
val res = b.groupByKey.collect
println("sc.parallelize(List(\"dog\", \"tiger\", \"lion\", \"cat\", \"spider\", \"eagle\"), 2)")
println("a.keyBy(_.length)")
println("b.groupByKey.collect")
res.foreach(println)
endFunction
}
def runHistogram: Unit =
{
println("***********************************************************************")
println("1 def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]]")
println("Compute a histogram of the data using bucketCount number of buckets evenly spaced between the minimum and maximum of the RDD.\n" +
"For example if the min value is 0 and the max is 100 and there are two buckets the resulting buckets will be [0, 50) [50, 100].\n" +
"BucketCount must be at least 1 If the RDD contains infinity, \n" +
"NaN throws an exception If the elements in RDD do not vary (max == min) always returns a single bucket.")
println("sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3)")
println("a.histogram(5)")
val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3)
var res = a.histogram(5)
val bucketsCount = res._1.length
val frequences = res._2.length
for(b <- 1 until bucketsCount)
println(res._1(b-1) + " < x <= " + res._1(b) + ": " + res._2(b-1))
println("")
println("2: def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long]\n" +
"Compute a histogram using the provided buckets.\n" +
"The buckets are all open to the right except for the last which is closed. e.g. for the array [1, 10, 20, 50]\n" +
"the buckets are [1, 10) [10, 20) [20, 50] e.g <=x<10, 10<=x<20, 20<=x<=50\n" +
"And on the input of 1 and 50 we would have a histogram of 1, 0, 1" +
"If your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched from an O(log n) insertion to O(1) per element\n" +
"(where n = # buckets). If you set evenBuckets to true. buckets must be sorted and not contain any duplicates.\n" +
"Buckets array must be at least two elements All NaN entries are treated the same.\n" +
"If you have a NaN bucket it must be the maximum value of the last position and all NaN entries will be counted in that bucket.")
val adBuckets = Array(0.0, 3.0, 8.0)
println("a.histogram(Array(0.0, 3.0, 8.0)")
val k: Array[Long] = a.histogram(adBuckets)
for(b <- 1 until adBuckets.length)
println(adBuckets(b-1) + " < x <= " + adBuckets(b) + ": " + k(b-1))
}
def runLookUp: Unit =
{
println("***********************************************************************")
println("def lookup(key: K): Seq[V]")
println("Scans the RDD for all keys that match the provided value and returns their values as a Scala sequence.")
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
println("sc.parallelize(List(\"dog\", \"tiger\", \"lion\", \"cat\", \"panther\", \"eagle\"), 2")
println("a.map(x => (x.length, x)")
println("b.lookup(5)")
val res = b.lookup(5) //res0: Seq[String] = WrappedArray(tiger, eagle)
res.foreach(println)
}
def runMapPartitions: Unit =
{
println("***********************************************************************")
println("def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]")
println("This is a specialized map that is called only once for each partition.\n" +
"The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]).\n" +
"The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD.\n" +
"Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose..")
val a = sc.parallelize(1 to 9, 3)
def myfunc1(index: Int, iter: Iterator[Int]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val p = a.mapPartitionsWithIndex(myfunc1).collect
p.foreach(println)
println("")
def myfunc2[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext) {
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
println("sc.parallelize(1 to 9, 3)")
println("a.mapPartitions(myfunc2).collect")
val res = a.mapPartitions(myfunc2).collect
//res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
res.foreach(println)
}
def runReduceByKey: Unit =
{
println("***********************************************************************")
println("def reduceByKey(func: (V, V) => V): RDD[(K, V)]")
println("This function provides the well-known reduce functionality in Spark.\n" +
"Please note that any function f you provide, should be commutative in order to generate reproducible results.")
println("sc.parallelize(List(\"dog\", \"cat\", \"owl\", \"gnu\", \"ant\"), 2)")
println("a.map(x => (x.length, x))")
println("b.reduceByKey(_ + _).collect")
var a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
var b = a.map(x => (x.length, x))
var res = b.reduceByKey(_ + _).collect
//res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))
res.foreach(println)
println("")
println("sc.parallelize(List(\"dog\", \"tiger\", \"lion\", \"cat\", \"panther\", \"eagle\"), 2)")
println("a.map(x => (x.length, x))")
println("b.reduceByKey(_ + _).collect")
a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
b = a.map(x => (x.length, x))
res = b.reduceByKey(_ + _).collect
//res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
res.foreach(println)
}
def runSampleFunctions: Unit =
{
println("***********************************************************************")
println("sampleByKey")
println("def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong): RDD[(K, V)]")
println("Randomly samples the key value pair RDD according to the fraction of each key you want to appear in the final RDD.")
println("sc.parallelize(List( (1,\"cat\"), (0, \"mouse\"),(1, \"cup\"), (0, \"book\"), (1, \"tv\"), (0, \"screen\"), (1, \"heater\"))")
println("andRDD.sampleByKey(false, Map( 1 -> 0.4, 0 -> 0.6 ), 42)")
val randRDD = sc.parallelize(List( (1,"cat"), (0, "mouse"),(1, "cup"), (0, "book"), (1, "tv"), (0, "screen"), (1, "heater")))
val sampleMap = Map( 1 -> 0.4, 0 -> 0.6 )
val res = randRDD.sampleByKey(false, sampleMap, 42).collect
res.foreach(println)
println("")
println("takeSample")
println("def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T]")
println("Behaves different from sample in the following respects:\n" +
"It will return an exact number of samples 2nd parameter)\n" +
"It returns an Array instead of RDD. It internally randomizes the order of the items returned")
println("sc.parallelize(1 to 1000, 3)")
println("x.takeSample(withReplacement = true, 100, 1)")
val x = sc.parallelize(1 to 1000, 3)
var res2 = x.takeSample(true, 100, 1)
for(e <- 1 to 100) {
if( e % 10 != 0)
print(res2(e-1) + "\t")
else
print(res2(e-1) + "\n")
}
println("")
println("Distinct count: " + res2.distinct.length + " on 100")
println("x.takeSample(withReplacement = true, 100, 1)")
res2 = x.takeSample(false, 100, 1)
println("Distinct count: " + res2.distinct.length + " on 100")
}
def runSortingFunctions: Unit =
{
println("***********************************************************************")
println("sort")
println("def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.size)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]")
println("This function sorts the input RDD's data and stores it in a new RDD.\n" +
"The first parameter requires you to specify a function which maps the input data into the key that you want to sortBy.\n" +
"The second parameter (optional) specifies whether you want the data to be sorted in ascending or descending order")
//val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
//y.sortBy(c => c, true).collect
//res101: Array[Int] = Array(1, 1, 2, 3, 5, 7)
println("sc.parallelize(Array((\"H\", 10), (\"A\", 26), (\"Z\", 1), (\"L\", 5))")
println("z.sortBy(c => c._1, true)")
val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))
var res = z.sortBy(c => c._1, true).collect
res.foreach(println)
println("z.sortBy(c => c._2, true)")
res = z.sortBy(c => c._2, true).collect
res.foreach(println)
}
def runSubtractByKey: Unit =
{
println("***********************************************************************")
println("subtractByKey")
println("def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]")
println("Very similar to subtract, but instead of supplying a function, the key-component of each pair will be automatically used\n" +
"as criterion for removing items from the first RDD.")
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
b.foreach{case (k,v) => println(k + "->" + v)}
println("")
val c = sc.parallelize(List("ant", "falcon", "squid"), 2)
val d = c.keyBy(_.length)
d.foreach{case (k,v) => println(k + "->" + v)}
println("")
println("b.subtractByKey(d).collect")
val res = b.subtractByKey(d).collect
//res15: Array[(Int, String)] = Array((4,lion))
res.foreach{case (k,v) => println(k + "->" + v)}
}
def runTreeFunctions: Unit =
{
println("***********************************************************************")
println("treeAggregate")
println("def treeAggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U, depth: Int = 2)(implicit arg0: ClassTag[U]): U")
println("Computes the same thing as aggregate, except it aggregates the elements of the RDD in a multi-level tree pattern.\n" +
"It uses the initial value for the second reduce function (combOp).\n" +
"By default a tree of depth 2 is used, but this can be changed via the depth parameter.")
val t1 = sc.parallelize(List(1,2,3,4,5,6), 2)
println("sc.parallelize(List(1,2,3,4,5,6), 2) created")
// lets first print out the contents of the RDD with partition labels
def myfunc1(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val p = t1.mapPartitionsWithIndex(myfunc1).collect
println("z.mapPartitionsWithIndex(myfunc) for partition data visualization")
p.foreach(println)
var r1 : Int = t1.treeAggregate(0)(math.max(_, _), _ + _)
println("z.aggregate(0)(math.max(_, _), _ + _) executed")
println("res:" + r1)
println("z.aggregate(5)(math.max(_, _), _ + _) executed")
r1 = t1.aggregate(5)(math.max(_, _), _ + _)
println("res:" + r1)
println("This example returns 11 since the initial value is 5\n" +
"Reduce of partition 0 will be max(5, 1, 2, 3) = 5\n" +
"Reduce of partition 1 will be max(4, 5, 6) = 6\n" +
"Final reduce across partitions will be 5 + 6 = 11\n" +
"The final reduce INCLUDE the initial value")
println("")
println("treeReduce")
println("def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T")
println("Works like reduce except reduces the elements of the RDD in a multi-level tree pattern.")
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
println("sc.parallelize(List(1,2,3,4,5,6), 2)")
println("z.treeReduce(_+_)")
val res = z.treeReduce(_+_)
println("Res: " + res)
}
def endFunction: Unit =
{
println("***********************************************************************")
println("")
println("")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment