Skip to content

Instantly share code, notes, and snippets.

Last active April 10, 2020 11:11
  • Star 25 You must be signed in to star a gist
  • Fork 14 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
import{CountVectorizer, RegexTokenizer, StopWordsRemover}
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.Vector
import sqlContext.implicits._
val numTopics: Int = 100
val maxIterations: Int = 100
val vocabSize: Int = 10000
* Data Preprocessing
// Load the raw articles, assign docIDs, and convert to DataFrame
val rawTextRDD = ... // loaded from S3
val docDF = rawTextRDD.zipWithIndex.toDF("text", "docId")
// Split each document into words
val tokens = new RegexTokenizer()
// Filter out stopwords
val stopwords: Array[String] = ... // loaded from S3
val filteredTokens = new StopWordsRemover()
// Limit to top `vocabSize` most common words and convert to word count vector features
val cvModel = new CountVectorizer()
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
* Configure and run LDA
val mbf = {
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
val corpusSize = countVectors.count()
2.0 / maxIterations + 1.0 / corpusSize
val lda = new LDA()
.setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf)))
.setDocConcentration(-1) // use default symmetric document-topic prior
.setTopicConcentration(-1) // use default symmetric topic-word prior
val startTime = System.nanoTime()
val ldaModel =
val elapsed = (System.nanoTime() - startTime) / 1e9
* Print results.
// Print training time
println(s"Finished training LDA model. Summary:")
println(s"Training time (sec)\t$elapsed")
// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
val vocabArray = cvModel.vocabulary
val topics = { case (terms, termWeights) =>
println(s"$numTopics topics:")
topics.zipWithIndex.foreach { case (topic, i) =>
println(s"TOPIC $i")
topic.foreach { case (term, weight) => println(s"$term\t$weight") }
Copy link

kuza55 commented Jan 15, 2016

I noticed that the LDA model has maxIterations explicitly set to 2 on line 57, rather than the 100 defined at the top of the file; does this affect the benchmarks in the associated blog post?

Copy link

abrocod commented Feb 5, 2016

The blog post mentioned that we can also query for the top topics that each document talks about. I am wondering how to do this?

Copy link

Can you specify the memory usage need to run this Pipeline

Copy link

getting error on
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }

my input is :

input schema:
|-- article: string (nullable = true)
|-- id: string (nullable = true)
|-- title: string (nullable = true)
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
|-- filtered: array (nullable = true)
| |-- element: string (containsNull = true)

18/01/29 15:41:21 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 33)
scala.MatchError: [69d5e6e6-554a-4e8f-9c53-341c10194260,(1735,[0...........................................................................])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

Copy link

amirhadad commented Feb 7, 2018

Details here:
The cvModel transformation was broken here is the fixed version of the code:

val cvModel = new CountVectorizer()

val countVectors = cvModel
        .select("docId","features") { case Row(docId: String, features: MLVector) => 
                   (docId.toLong, Vectors.fromML(features)) 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment