Instantly share code, notes, and snippets.

Embed
What would you like to do?
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover}
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.Vector
import sqlContext.implicits._
val numTopics: Int = 100
val maxIterations: Int = 100
val vocabSize: Int = 10000
/**
* Data Preprocessing
*/
// Load the raw articles, assign docIDs, and convert to DataFrame
val rawTextRDD = ... // loaded from S3
val docDF = rawTextRDD.zipWithIndex.toDF("text", "docId")
// Split each document into words
val tokens = new RegexTokenizer()
.setGaps(false)
.setPattern("\\p{L}+")
.setInputCol("text")
.setOutputCol("words")
.transform(docDF)
// Filter out stopwords
val stopwords: Array[String] = ... // loaded from S3
val filteredTokens = new StopWordsRemover()
.setStopWords(stopwords)
.setCaseSensitive(false)
.setInputCol("words")
.setOutputCol("filtered")
.transform(tokens)
// Limit to top `vocabSize` most common words and convert to word count vector features
val cvModel = new CountVectorizer()
.setInputCol("filtered")
.setOutputCol("features")
.setVocabSize(vocabSize)
.fit(filteredTokens)
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
.cache()
/**
* Configure and run LDA
*/
val mbf = {
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
val corpusSize = countVectors.count()
2.0 / maxIterations + 1.0 / corpusSize
}
val lda = new LDA()
.setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf)))
.setK(numTopics)
.setMaxIterations(2)
.setDocConcentration(-1) // use default symmetric document-topic prior
.setTopicConcentration(-1) // use default symmetric topic-word prior
val startTime = System.nanoTime()
val ldaModel = lda.run(countVectors)
val elapsed = (System.nanoTime() - startTime) / 1e9
/**
* Print results.
*/
// Print training time
println(s"Finished training LDA model. Summary:")
println(s"Training time (sec)\t$elapsed")
println(s"==========")
// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
val vocabArray = cvModel.vocabulary
val topics = topicIndices.map { case (terms, termWeights) =>
terms.map(vocabArray(_)).zip(termWeights)
}
println(s"$numTopics topics:")
topics.zipWithIndex.foreach { case (topic, i) =>
println(s"TOPIC $i")
topic.foreach { case (term, weight) => println(s"$term\t$weight") }
println(s"==========")
}
@kuza55

This comment has been minimized.

Show comment
Hide comment
@kuza55

kuza55 Jan 15, 2016

I noticed that the LDA model has maxIterations explicitly set to 2 on line 57, rather than the 100 defined at the top of the file; does this affect the benchmarks in the associated blog post?

kuza55 commented Jan 15, 2016

I noticed that the LDA model has maxIterations explicitly set to 2 on line 57, rather than the 100 defined at the top of the file; does this affect the benchmarks in the associated blog post?

@abrocod

This comment has been minimized.

Show comment
Hide comment
@abrocod

abrocod Feb 5, 2016

The blog post mentioned that we can also query for the top topics that each document talks about. I am wondering how to do this?

abrocod commented Feb 5, 2016

The blog post mentioned that we can also query for the top topics that each document talks about. I am wondering how to do this?

@akashsethi24

This comment has been minimized.

Show comment
Hide comment
@akashsethi24

akashsethi24 Jan 6, 2017

Can you specify the memory usage need to run this Pipeline

akashsethi24 commented Jan 6, 2017

Can you specify the memory usage need to run this Pipeline

@imranshaikmuma

This comment has been minimized.

Show comment
Hide comment
@imranshaikmuma

imranshaikmuma Jan 29, 2018

getting error on
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
.cache()

my input is :
input

input schema:
root
|-- article: string (nullable = true)
|-- id: string (nullable = true)
|-- title: string (nullable = true)
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
|-- filtered: array (nullable = true)
| |-- element: string (containsNull = true)

error:
18/01/29 15:41:21 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 33)
scala.MatchError: [69d5e6e6-554a-4e8f-9c53-341c10194260,(1735,[0...........................................................................])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

imranshaikmuma commented Jan 29, 2018

getting error on
val countVectors = cvModel.transform(filteredTokens)
.select("docId", "features")
.map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
.cache()

my input is :
input

input schema:
root
|-- article: string (nullable = true)
|-- id: string (nullable = true)
|-- title: string (nullable = true)
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
|-- filtered: array (nullable = true)
| |-- element: string (containsNull = true)

error:
18/01/29 15:41:21 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 33)
scala.MatchError: [69d5e6e6-554a-4e8f-9c53-341c10194260,(1735,[0...........................................................................])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

@amirhadad

This comment has been minimized.

Show comment
Hide comment
@amirhadad

amirhadad Feb 7, 2018

Details here:
https://stackoverflow.com/questions/36410804/convert-javarddrow-to-javarddvector/48654319#48654319
The cvModel transformation was broken here is the fixed version of the code:

val cvModel = new CountVectorizer()
        .setInputCol("filtered")
        .setOutputCol("features")
        .setVocabSize(vocabSize)
        .fit(filteredTokens)
 

val countVectors = cvModel
        .transform(filteredTokens)
        .select("docId","features")
        .rdd.map { case Row(docId: String, features: MLVector) => 
                   (docId.toLong, Vectors.fromML(features)) 
                 }

amirhadad commented Feb 7, 2018

Details here:
https://stackoverflow.com/questions/36410804/convert-javarddrow-to-javarddvector/48654319#48654319
The cvModel transformation was broken here is the fixed version of the code:

val cvModel = new CountVectorizer()
        .setInputCol("filtered")
        .setOutputCol("features")
        .setVocabSize(vocabSize)
        .fit(filteredTokens)
 

val countVectors = cvModel
        .transform(filteredTokens)
        .select("docId","features")
        .rdd.map { case Row(docId: String, features: MLVector) => 
                   (docId.toLong, Vectors.fromML(features)) 
                 }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment