Last active
July 6, 2017 13:19
-
-
Save svillafe/e9efc21e1039b310e71e3c4207576844 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package wikipedia | |
import org.apache.spark.SparkConf | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.rdd.RDD | |
case class WikipediaArticle(title: String, text: String) | |
object WikipediaRanking { | |
val langs = List( | |
"JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS", | |
"Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy") | |
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Wikipedia") | |
val sc: SparkContext = new SparkContext(conf) | |
// Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse` | |
val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).map(WikipediaData.parse).persist() | |
/** Returns the number of articles on which the language `lang` occurs. | |
* Hint1: consider using method `aggregate` on RDD[T]. | |
* Hint2: should you count the "Java" language when you see "JavaScript"? | |
* Hint3: the only whitespaces are blanks " " | |
* Hint4: no need to search in the title :) | |
*/ | |
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = | |
rdd.aggregate(0)( | |
(runningTotal,article) => if(article.text.split(" ").contains(lang)) runningTotal + 1 else runningTotal, | |
(x, y) => x + y | |
) | |
/* (1) Use `occurrencesOfLang` to compute the ranking of the languages | |
* (`val langs`) by determining the number of Wikipedia articles that | |
* mention each language at least once. Don't forget to sort the | |
* languages by their occurrence, in decreasing order! | |
* | |
* Note: this operation is long-running. It can potentially run for | |
* several seconds. | |
*/ | |
def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = { | |
langs.map(l => (l, occurrencesOfLang(l, rdd))).sortBy(i => i._2)(Ordering[Int].reverse) | |
} | |
/* Compute an inverted index of the set of articles, mapping each language | |
* to the Wikipedia pages in which it occurs. | |
*/ | |
def makeIndex(languages: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = { | |
val response = rdd.flatMap( article => { | |
val languagesIncluded = languages.filter(language => article.text.split(" ").contains(language)) | |
languagesIncluded.map(language => (language, Iterable(article))) | |
}) | |
response.reduceByKey((a : Iterable[WikipediaArticle], b: Iterable[WikipediaArticle]) => a ++ b) | |
} | |
/* (2) Compute the language ranking again, but now using the inverted index. Can you notice | |
* a performance improvement? | |
* | |
* Note: this operation is long-running. It can potentially run for | |
* several seconds. | |
*/ | |
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = | |
index.mapValues(t => t.size).collect().toList.sortBy(_._2)(Ordering[Int].reverse) | |
/* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined. | |
* Can you notice an improvement in performance compared to measuring *both* the computation of the index | |
* and the computation of the ranking? If so, can you think of a reason? | |
* | |
* Note: this operation is long-running. It can potentially run for | |
* several seconds. | |
*/ | |
def rankLangsReduceByKey(languages: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = { | |
val k = rdd.flatMap(article => languages.map(language => (language, if (article.text.split(" ").contains(language)) 1 else 0))) | |
k.reduceByKey(_ +_).collect().toList.sortBy(i => i._2)(Ordering[Int].reverse) | |
} | |
def main(args: Array[String]) { | |
/* Languages ranked according to (1) */ | |
val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd)) | |
/* An inverted index mapping languages to wikipedia pages on which they appear */ | |
def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd) | |
/* Languages ranked according to (2), using the inverted index */ | |
val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index)) | |
/* Languages ranked according to (3) */ | |
val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd)) | |
/* Output the speed of each ranking */ | |
println(timing) | |
sc.stop() | |
} | |
val timing = new StringBuffer | |
def timed[T](label: String, code: => T): T = { | |
val start = System.currentTimeMillis() | |
val result = code | |
val stop = System.currentTimeMillis() | |
timing.append(s"Processing $label took ${stop - start} ms.\n") | |
result | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment