Skip to content

Instantly share code, notes, and snippets.

@feynmanliang
Last active April 10, 2020 11:11
Show Gist options
  • Star 25 You must be signed in to star a gist
  • Fork 14 You must be signed in to fork a gist
  • Save feynmanliang/3b6555758a27adcb527d to your computer and use it in GitHub Desktop.
Save feynmanliang/3b6555758a27adcb527d to your computer and use it in GitHub Desktop.
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover}
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.Vector
import sqlContext.implicits._
val numTopics: Int = 100
val maxIterations: Int = 100
val vocabSize: Int = 10000
/**
* Data Preprocessing
*/
// Load the raw articles, assign docIDs, and convert to DataFrame
val rawTextRDD = ... // loaded from S3
val docDF = rawTextRDD.zipWithIndex.toDF("text", "docId")
// Split each document into words
val tokens = new RegexTokenizer()
.setGaps(false)
.setPattern("\\p{L}+")
.setInputCol("text")
.setOutputCol("words")
.transform(docDF)
// Filter out stopwords
val stopwords: Array[String] = ... // loaded from S3
val filteredTokens = new StopWordsRemover()
.setStopWords(stopwords)
.setCaseSensitive(false)
.setInputCol("words")
.setOutputCol("filtered")
.transform(tokens)
// Limit to top `vocabSize` most common words and convert to word count vector features
val cvModel = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.setVocabSize(vocabSize)
.fit(filteredTokens)
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
.cache()
/**
* Configure and run LDA
*/
val mbf = {
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
val corpusSize = countVectors.count()
2.0 / maxIterations + 1.0 / corpusSize
}
val lda = new LDA()
.setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf)))
.setK(numTopics)
.setMaxIterations(2)
.setDocConcentration(-1) // use default symmetric document-topic prior
.setTopicConcentration(-1) // use default symmetric topic-word prior
val startTime = System.nanoTime()
val ldaModel = lda.run(countVectors)
val elapsed = (System.nanoTime() - startTime) / 1e9
/**
* Print results.
*/
// Print training time
println(s"Finished training LDA model. Summary:")
println(s"Training time (sec)\t$elapsed")
println(s"==========")
// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
val vocabArray = cvModel.vocabulary
val topics = topicIndices.map { case (terms, termWeights) =>
terms.map(vocabArray(_)).zip(termWeights)
}
println(s"$numTopics topics:")
topics.zipWithIndex.foreach { case (topic, i) =>
println(s"TOPIC $i")
topic.foreach { case (term, weight) => println(s"$term\t$weight") }
println(s"==========")
}
@kuza55
Copy link

kuza55 commented Jan 15, 2016

I noticed that the LDA model has maxIterations explicitly set to 2 on line 57, rather than the 100 defined at the top of the file; does this affect the benchmarks in the associated blog post?

@abrocod
Copy link

abrocod commented Feb 5, 2016

The blog post mentioned that we can also query for the top topics that each document talks about. I am wondering how to do this?

@akashsethi24
Copy link

Can you specify the memory usage need to run this Pipeline

@imranshaikmuma
Copy link

getting error on
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
.cache()

my input is :
input

input schema:
root
|-- article: string (nullable = true)
|-- id: string (nullable = true)
|-- title: string (nullable = true)
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
|-- filtered: array (nullable = true)
| |-- element: string (containsNull = true)

error:
18/01/29 15:41:21 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 33)
scala.MatchError: [69d5e6e6-554a-4e8f-9c53-341c10194260,(1735,[0...........................................................................])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

@amirhadad
Copy link

amirhadad commented Feb 7, 2018

Details here:
https://stackoverflow.com/questions/36410804/convert-javarddrow-to-javarddvector/48654319#48654319
The cvModel transformation was broken here is the fixed version of the code:

val cvModel = new CountVectorizer()
        .setInputCol("filtered")
        .setOutputCol("features")
        .setVocabSize(vocabSize)
        .fit(filteredTokens)
 

val countVectors = cvModel
        .transform(filteredTokens)
        .select("docId","features")
        .rdd.map { case Row(docId: String, features: MLVector) => 
                   (docId.toLong, Vectors.fromML(features)) 
                 }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment