import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover} | |
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer} | |
import org.apache.spark.mllib.linalg.Vector | |
import sqlContext.implicits._ | |
val numTopics: Int = 100 | |
val maxIterations: Int = 100 | |
val vocabSize: Int = 10000 | |
/** | |
* Data Preprocessing | |
*/ | |
// Load the raw articles, assign docIDs, and convert to DataFrame | |
val rawTextRDD = ... // loaded from S3 | |
val docDF = rawTextRDD.zipWithIndex.toDF("text", "docId") | |
// Split each document into words | |
val tokens = new RegexTokenizer() | |
.setGaps(false) | |
.setPattern("\\p{L}+") | |
.setInputCol("text") | |
.setOutputCol("words") | |
.transform(docDF) | |
// Filter out stopwords | |
val stopwords: Array[String] = ... // loaded from S3 | |
val filteredTokens = new StopWordsRemover() | |
.setStopWords(stopwords) | |
.setCaseSensitive(false) | |
.setInputCol("words") | |
.setOutputCol("filtered") | |
.transform(tokens) | |
// Limit to top `vocabSize` most common words and convert to word count vector features | |
val cvModel = new CountVectorizer() | |
.setInputCol("filtered") | |
.setOutputCol("features") | |
.setVocabSize(vocabSize) | |
.fit(filteredTokens) | |
val countVectors = cvModel.transform(filteredTokens) | |
.select("docId", "features") | |
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) } | |
.cache() | |
/** | |
* Configure and run LDA | |
*/ | |
val mbf = { | |
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets. | |
val corpusSize = countVectors.count() | |
2.0 / maxIterations + 1.0 / corpusSize | |
} | |
val lda = new LDA() | |
.setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf))) | |
.setK(numTopics) | |
.setMaxIterations(2) | |
.setDocConcentration(-1) // use default symmetric document-topic prior | |
.setTopicConcentration(-1) // use default symmetric topic-word prior | |
val startTime = System.nanoTime() | |
val ldaModel = lda.run(countVectors) | |
val elapsed = (System.nanoTime() - startTime) / 1e9 | |
/** | |
* Print results. | |
*/ | |
// Print training time | |
println(s"Finished training LDA model. Summary:") | |
println(s"Training time (sec)\t$elapsed") | |
println(s"==========") | |
// Print the topics, showing the top-weighted terms for each topic. | |
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10) | |
val vocabArray = cvModel.vocabulary | |
val topics = topicIndices.map { case (terms, termWeights) => | |
terms.map(vocabArray(_)).zip(termWeights) | |
} | |
println(s"$numTopics topics:") | |
topics.zipWithIndex.foreach { case (topic, i) => | |
println(s"TOPIC $i") | |
topic.foreach { case (term, weight) => println(s"$term\t$weight") } | |
println(s"==========") | |
} |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
Show comment Hide comment
kuza55
commented
Jan 15, 2016
I noticed that the LDA model has maxIterations explicitly set to 2 on line 57, rather than the 100 defined at the top of the file; does this affect the benchmarks in the associated blog post? |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
Show comment Hide comment
abrocod
Feb 5, 2016
The blog post mentioned that we can also query for the top topics that each document talks about. I am wondering how to do this?
abrocod
commented
Feb 5, 2016
The blog post mentioned that we can also query for the top topics that each document talks about. I am wondering how to do this? |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
Show comment Hide comment
akashsethi24
commented
Jan 6, 2017
Can you specify the memory usage need to run this Pipeline |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
Show comment Hide comment
imranshaikmuma
Jan 29, 2018
getting error on
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
.cache()
input schema:
root
|-- article: string (nullable = true)
|-- id: string (nullable = true)
|-- title: string (nullable = true)
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
|-- filtered: array (nullable = true)
| |-- element: string (containsNull = true)
error:
18/01/29 15:41:21 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 33)
scala.MatchError: [69d5e6e6-554a-4e8f-9c53-341c10194260,(1735,[0...........................................................................])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
imranshaikmuma
commented
Jan 29, 2018
getting error on input schema: error: |
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
Show comment Hide comment
amirhadad
Feb 7, 2018
Details here:
https://stackoverflow.com/questions/36410804/convert-javarddrow-to-javarddvector/48654319#48654319
The cvModel transformation was broken here is the fixed version of the code:
val cvModel = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.setVocabSize(vocabSize)
.fit(filteredTokens)
val countVectors = cvModel
.transform(filteredTokens)
.select("docId","features")
.rdd.map { case Row(docId: String, features: MLVector) =>
(docId.toLong, Vectors.fromML(features))
}
amirhadad
commented
Feb 7, 2018
•
edited
edited
Details here:
|
I noticed that the LDA model has maxIterations explicitly set to 2 on line 57, rather than the 100 defined at the top of the file; does this affect the benchmarks in the associated blog post?