Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover}
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.Vector
import sqlContext.implicits._
val numTopics: Int = 100
val maxIterations: Int = 100
val vocabSize: Int = 10000
/**
* Data Preprocessing
*/
// Load the raw articles, assign docIDs, and convert to DataFrame
val rawTextRDD = ... // loaded from S3
val docDF = rawTextRDD.zipWithIndex.toDF("text", "docId")
// Split each document into words
val tokens = new RegexTokenizer()
.setGaps(false)
.setPattern("\\p{L}+")
.setInputCol("text")
.setOutputCol("words")
.transform(docDF)
// Filter out stopwords
val stopwords: Array[String] = ... // loaded from S3
val filteredTokens = new StopWordsRemover()
.setStopWords(stopwords)
.setCaseSensitive(false)
.setInputCol("words")
.setOutputCol("filtered")
.transform(tokens)
// Limit to top `vocabSize` most common words and convert to word count vector features
val cvModel = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.setVocabSize(vocabSize)
.fit(filteredTokens)
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
.cache()
/**
* Configure and run LDA
*/
val mbf = {
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
val corpusSize = countVectors.count()
2.0 / maxIterations + 1.0 / corpusSize
}
val lda = new LDA()
.setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf)))
.setK(numTopics)
.setMaxIterations(2)
.setDocConcentration(-1) // use default symmetric document-topic prior
.setTopicConcentration(-1) // use default symmetric topic-word prior
val startTime = System.nanoTime()
val ldaModel = lda.run(countVectors)
val elapsed = (System.nanoTime() - startTime) / 1e9
/**
* Print results.
*/
// Print training time
println(s"Finished training LDA model. Summary:")
println(s"Training time (sec)\t$elapsed")
println(s"==========")
// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
val vocabArray = cvModel.vocabulary
val topics = topicIndices.map { case (terms, termWeights) =>
terms.map(vocabArray(_)).zip(termWeights)
}
println(s"$numTopics topics:")
topics.zipWithIndex.foreach { case (topic, i) =>
println(s"TOPIC $i")
topic.foreach { case (term, weight) => println(s"$term\t$weight") }
println(s"==========")
}
@kuza55
kuza55 commented Jan 15, 2016

I noticed that the LDA model has maxIterations explicitly set to 2 on line 57, rather than the 100 defined at the top of the file; does this affect the benchmarks in the associated blog post?

@abrocod
abrocod commented Feb 5, 2016

The blog post mentioned that we can also query for the top topics that each document talks about. I am wondering how to do this?

@akashsethi24

Can you specify the memory usage need to run this Pipeline

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment