Skip to content

Instantly share code, notes, and snippets.

@svillafe
Last active July 6, 2017 13:19
Show Gist options
  • Save svillafe/e9efc21e1039b310e71e3c4207576844 to your computer and use it in GitHub Desktop.
Save svillafe/e9efc21e1039b310e71e3c4207576844 to your computer and use it in GitHub Desktop.
package wikipedia
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
case class WikipediaArticle(title: String, text: String)
object WikipediaRanking {
val langs = List(
"JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
"Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Wikipedia")
val sc: SparkContext = new SparkContext(conf)
// Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse`
val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).map(WikipediaData.parse).persist()
/** Returns the number of articles on which the language `lang` occurs.
* Hint1: consider using method `aggregate` on RDD[T].
* Hint2: should you count the "Java" language when you see "JavaScript"?
* Hint3: the only whitespaces are blanks " "
* Hint4: no need to search in the title :)
*/
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int =
rdd.aggregate(0)(
(runningTotal,article) => if(article.text.split(" ").contains(lang)) runningTotal + 1 else runningTotal,
(x, y) => x + y
)
/* (1) Use `occurrencesOfLang` to compute the ranking of the languages
* (`val langs`) by determining the number of Wikipedia articles that
* mention each language at least once. Don't forget to sort the
* languages by their occurrence, in decreasing order!
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
langs.map(l => (l, occurrencesOfLang(l, rdd))).sortBy(i => i._2)(Ordering[Int].reverse)
}
/* Compute an inverted index of the set of articles, mapping each language
* to the Wikipedia pages in which it occurs.
*/
def makeIndex(languages: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
val response = rdd.flatMap( article => {
val languagesIncluded = languages.filter(language => article.text.split(" ").contains(language))
languagesIncluded.map(language => (language, Iterable(article)))
})
response.reduceByKey((a : Iterable[WikipediaArticle], b: Iterable[WikipediaArticle]) => a ++ b)
}
/* (2) Compute the language ranking again, but now using the inverted index. Can you notice
* a performance improvement?
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
index.mapValues(t => t.size).collect().toList.sortBy(_._2)(Ordering[Int].reverse)
/* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.
* Can you notice an improvement in performance compared to measuring *both* the computation of the index
* and the computation of the ranking? If so, can you think of a reason?
*
* Note: this operation is long-running. It can potentially run for
* several seconds.
*/
def rankLangsReduceByKey(languages: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
val k = rdd.flatMap(article => languages.map(language => (language, if (article.text.split(" ").contains(language)) 1 else 0)))
k.reduceByKey(_ +_).collect().toList.sortBy(i => i._2)(Ordering[Int].reverse)
}
def main(args: Array[String]) {
/* Languages ranked according to (1) */
val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))
/* An inverted index mapping languages to wikipedia pages on which they appear */
def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
/* Languages ranked according to (2), using the inverted index */
val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))
/* Languages ranked according to (3) */
val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))
/* Output the speed of each ranking */
println(timing)
sc.stop()
}
val timing = new StringBuffer
def timed[T](label: String, code: => T): T = {
val start = System.currentTimeMillis()
val result = code
val stop = System.currentTimeMillis()
timing.append(s"Processing $label took ${stop - start} ms.\n")
result
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment