Skip to content

Instantly share code, notes, and snippets.

@thash
Last active March 22, 2016 18:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thash/1a40ea6e619e110762c7 to your computer and use it in GitHub Desktop.
Save thash/1a40ea6e619e110762c7 to your computer and use it in GitHub Desktop.
// $ curl -s -L http://dumps.wikimedia.org/enwiki/latest/\
// enwiki-latest-pages-articles-multistream.xml.bz2 \
// | bzip2 -cd \
// | hadoop fs -put - /user/ds/wikidump.xml
//
// $ sudo yum install -y git
// $ git clone https://github.com/sryza/aas ~/aas
// $ cd ~/aas
// $ sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
// $ sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
// $ sudo yum install -y apache-maven
// $ mvn package ;; https://github.com/sryza/aas/issues/14 -- should run from repo root
// $ cd ch06-lsa
// $ spark-shell --jars target/ch06-lsa-1.0.2-jar-with-dependencies.jar
import com.cloudera.datascience.common.XmlInputFormat
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.umd.cloud9.collection.wikipedia.WikipediaPage
import edu.umd.cloud9.collection.wikipedia._
import edu.umd.cloud9.collection.wikipedia.language.EnglishWikipediaPage
import edu.umd.cloud9.collection.wikipedia.language._
import java.io.{FileOutputStream, PrintStream}
import java.util.Properties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.rdd.RDD
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
val path = "hdfs:///user/ds/wikidump.xml"
@transient val conf = new Configuration()
// conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
val kvs = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
// 16/03/21 10:36:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 202.9 KB, free 202.9 KB)
// 16/03/21 10:36:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.8 KB, free 226.8 KB)
// 16/03/21 10:36:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.31.26.245:58525 (size: 23.8 KB, free: 518.0 MB)
// 16/03/21 10:36:26 INFO SparkContext: Created broadcast 0 from newAPIHadoopFile at <console>:36
// kvs: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = hdfs:///user/ds/wikidump.xml NewHadoopRDD[0] at newAPIHadoopFile at <console>:36
val rawXmls = kvs.map(p => p._2.toString)
// rawXmls: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:38
def wikiXmlToPlainText(pageXml: String): Option[(String, String)] = {
val page = new EnglishWikipediaPage()
WikipediaPage.readPage(page, pageXml)
if (page.isEmpty || !page.isArticle || page.isRedirect ||
page.getTitle.contains("(disambiguation)")) {
None
} else {
Some((page.getTitle, page.getContent))
}
}
// wikiXmlToPlainText: (pageXml: String)Option[(String, String)]
val plainText = rawXmls.flatMap(wikiXmlToPlainText)
// plainText: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at flatMap at <console>:69
def createNLPPipeline(): StanfordCoreNLP = {
val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")
new StanfordCoreNLP(props)
}
// createNLPPipeline: ()edu.stanford.nlp.pipeline.StanfordCoreNLP
def isOnlyLetters(str: String): Boolean = {
// While loop for high performance
var i = 0
while (i < str.length) {
if (!Character.isLetter(str.charAt(i))) {
return false
}
i += 1
}
true
}
// isOnlyLetters: (str: String)Boolean
def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP): Seq[String] = {
val doc = new Annotation(text)
pipeline.annotate(doc)
val lemmas = new ArrayBuffer[String]()
val sentences = doc.get(classOf[SentencesAnnotation])
for (sentence <- sentences.asScala;
token <- sentence.get(classOf[TokensAnnotation]).asScala) {
val lemma = token.get(classOf[LemmaAnnotation])
if (lemma.length > 2 && !stopWords.contains(lemma) && isOnlyLetters(lemma)) {
lemmas += lemma.toLowerCase
}
}
lemmas
}
// assume that pwd => aas/ch06-lsa
val stopWords = sc.broadcast(scala.io.Source.fromFile("src/main/resources/stopwords.txt").getLines().toSet).value
// 16/03/21 10:59:37 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 15.7 KB, free 242.5 KB)
// 16/03/21 10:59:37 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1114.0 B, free 243.6 KB)
// 16/03/21 10:59:37 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.31.26.245:58525 (size: 1114.0 B, free: 518.0 MB)
// 16/03/21 10:59:37 INFO SparkContext: Created broadcast 1 from broadcast at <console>:59
// stopWords: scala.collection.immutable.Set[String] = Set(down, it's, that's, for, further, she'll, any, there's, this, haven't, in, ought, myself, have, your, off, once, i'll, are, is, his, why, too, why's, am, than, isn't, didn't, himself, but,...
val lemmatized = plainText.mapPartitions(iter => {
val pipeline = createNLPPipeline()
iter.map{ case(title, contents) => (title, plainTextToLemmas(contents, stopWords, pipeline))}
})
// lemmatized: org.apache.spark.rdd.RDD[(String, Seq[String])] = MapPartitionsRDD[3] at mapPartitions at <console>:79
// 16/03/21 17:54:15 WARN TaskSetManager: Lost task 24.0 in stage 0.0 (TID 56, ip-172-31-13-188.ap-northeast-1.compute.internal): java.lang.IllegalArgumentException: No annotator named tokenize
// at edu.stanford.nlp.pipeline.AnnotatorPool.get(AnnotatorPool.java:83)
// at edu.stanford.nlp.pipeline.StanfordCoreNLP.construct(StanfordCoreNLP.java:292)
// val docTermFreqs = docs.mapValues(terms => {
val docTermFreqs = lemmatized.mapValues(terms => {
val termFreqsInDoc = terms.foldLeft(new HashMap[String, Int]()) {
(map, term) => map += term -> (map.getOrElse(term, 0) + 1)
}
termFreqsInDoc
})
// docTermFreqs: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.HashMap[String,Int])] = MapPartitionsRDD[4] at mapValues at <console>:81
// this RDD will be used at least twice, so it's good idea to cache it.
docTermFreqs.cache()
// comb: (dfs1: scala.collection.mutable.HashMap[String,Int], dfs2: scala.collection.mutable.HashMap[String,Int])scala.collection.mutable.HashMap[String,Int]
//// It's too large, so don't execute code below:
// docTermFreqs.flatMap(_._2.keySet).distinct().count()
// docTermFreqs.map(_._2).aggregate(zero)(merge, comb)
// in github code: documentFrequenciesDistributed(docTermFreqs.map(_._2), numTerms)
val docFreqs = docTermFreqs.flatMap(_._2.keySet).map((_, 1)).reduceByKey(_ + _)
// docFreqs: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:81
val numTerms = 50000 // original: 50000
val numDocs = 10
val ordering = Ordering.by[(String, Int), Int](_._2)
// *** //
// slow... 12 core => 2.5hr
val topDocFreqs = docFreqs.top(numTerms)(ordering)
// *** //
// 16/03/21 12:22:28 INFO TaskSetManager: Finished task 19.0 in stage 0.0 (TID 15) in 2264441 ms on ip-172-31-18-236.ap-northeast-1.compute.internal (13/421)
val idfs = topDocFreqs.map{
case (term, count) => (term, math.log(numDocs.toDouble / count))
}.toMap
// idfs: scala.collection.immutable.Map[String,Double] = Map(dredd -> -4.55597994179732, gaiden -> -4.550714000192032, quotient -> -5.633360010359655, clarissa -> -5.196838197598183, incident -> -9.008530138598365, meteorologist -> -5.95246380975825, misfire -> -4.600157644164547, serious -> -9.03063937850481, wgbh -> -4.3528552573736015, brink -> -6.359227788032135, turnstile -> -4.898585790287632, gans -> -4.4953553199808844, isamu -> -4.4796069630127455, acronym -> -7.0597894348376355, youthful -> -6.348264483234865, sinister -> -6.602859225795668, comply -> -7.574301777744903, ebb -> -5.602118820879701, breaks -> -5.236441962829949, mafioso -> -4.484131857611035, marr -> -5.518255786913298, dns -> -6.392084603552947, forgotten -> -6.736017515727827, layton -> -5.728475087246572, waterv...
val idTerms = idfs.keys.zipWithIndex.toMap
val termIds = idTerms.map(_.swap)
// termIds are very large and will be used in multiple places, so broadcast it.
val bTermIds = sc.broadcast(termIds).value
val bIdfs = sc.broadcast(idfs).value
val bIdTerms = sc.broadcast(idTerms).value
val termDocMatrix = docTermFreqs.map(_._2).map(termFreqs => {
val docTotalTerms = termFreqs.values.sum // Typographical error. values() => values
val termScores = termFreqs.filter {
case (term, freq) => bIdTerms.contains(term)
}.map{
case (term, freq) => (bIdTerms(term),
bIdfs(term) * termFreqs(term) / docTotalTerms)
}.toSeq
Vectors.sparse(bIdTerms.size, termScores)
})
// termDocMatrix: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[10] at map at <console>:99
termDocMatrix.cache()
val mat = new RowMatrix(termDocMatrix)
val k = 10 // original : 1000
// in RunLSA.scala : val k = if (args.length > 0) args(0).toInt else 100
val svd = mat.computeSVD(k, computeU=true)
// # java.lang.OutOfMemoryError: Java heap space
// # -XX:OnOutOfMemoryError="kill -9 %p"
// # Executing /bin/sh -c "kill -9 11363"...
// /usr/lib/spark/bin/spark-shell: 44 行: 11363 強制終了
// "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
// k = 10 OK
// svd: org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix] =
// SingularValueDecomposition(org.apache.spark.mllib.linalg.distributed.RowMatrix@6e63c7a0,[874.5968147274564,566.6543192053327,509.50910044225265,454.66005075189275,424.82823437080754,389.6360554949427,368.56883392687575,357.90655425497454,342.85972656956886,331.69166280914004],8.439268848830005E-6 2.020865219101397E-5 ... (10 total)
// 3.0862992102284116E-6 6.617427606824673E-6 ...
// 1.8824418738801623E-5 2.3732964695046555E-5 ...
// 1.0500150984320169E-5 1.9438089761772934E-5 ...
// 4.978838951252806E-4 7.897882750222361E-4 ...
// 4.8058326325708486E-5 8.077698461211198E-5 ...
// 1.5016628935073547E-6 2.9483585966287222E-6 ....
// println("Singular values: " + svd.s)
// // => Singular values: [874.5968147274564,566.6543192053327,509.50910044225265,454.66005075189275,424.82823437080754,389.6360554949427,368.56883392687575,357.90655425497454,342.85972656956886,331.69166280914004]
// val v = svd.v
// => <console>:108: error: value v is not a member of org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix]
val v = svd.V
val numConcepts = 10
val topTerms = new ArrayBuffer[Seq[(String, Double)]]()
// below also require some time (a few minutes)
val docIds = docTermFreqs.map(_._1).zipWithUniqueId().map(_.swap).collectAsMap().toMap
// # java.lang.OutOfMemoryError: Java heap space
// # -XX:OnOutOfMemoryError="kill -9 %p"
// # Executing /bin/sh -c "kill -9 26024"...
// /usr/lib/spark/bin/spark-shell: 44 行: 26024 強制終了 "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
for (i <- 0 until numConcepts) {
val offs = i * v.numRows
val termWeights = arr.slice(offs, offs + v.numRows).zipWithIndex
val sorted = termWeights.sortBy(-_._1)
topTerms += sorted.take(numTerms).map{
case (score, id) => (termIds(id), score)
}
}
topTerms
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment