Created March 24, 2015 23:56
LDA Example: Modeling topics in the Spark documentation
This example uses Scala. Please see the MLlib documentation for a Java example.
Try running this code in the Spark shell. It may produce different topics each time (since LDA includes some randomization), but it should give topics similar to those listed above.
This example is paired with a blog post on LDA in Spark:
import scala.collection.mutable
import org.apache.spark.mllib.clustering.LDA
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
// Load documents from text files, 1 document per file
val corpus: RDD[String] = sc.wholeTextFiles("docs/*.md").map(_._2)
// Split each document into a sequence of terms (words)
val tokenized: RDD[Seq[String]] ="\\s")).map(_.filter(_.length > 3).filter(_.forall(java.lang.Character.isLetter)))
// Choose the vocabulary.
// termCounts: Sorted list of (term, termCount) pairs
val termCounts: Array[(String, Long)] =
tokenized.flatMap( -> 1L)).reduceByKey(_ + _).collect().sortBy(-_._2)
// vocabArray: Chosen vocab (removing common terms)
val numStopwords = 20
val vocabArray: Array[String] =
termCounts.takeRight(termCounts.size - numStopwords).map(_._1)
// vocab: Map term -> term index
val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap
// Convert documents into term count vectors
val documents: RDD[(Long, Vector)] = { case (tokens, id) =>
val counts = new mutable.HashMap[Int, Double]()
tokens.foreach { term =>
if (vocab.contains(term)) {
val idx = vocab(term)
counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
(id, Vectors.sparse(vocab.size, counts.toSeq))
// Set LDA parameters
val numTopics = 10
val lda = new LDA().setK(numTopics).setMaxIterations(10)
val ldaModel =
val avgLogLikelihood = ldaModel.logLikelihood / documents.count()
// Print topics, showing top-weighted 10 terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
topicIndices.foreach { case (terms, termWeights) =>
println("TOPIC:") { case (term, weight) =>
alex9311 commented Jun 17, 2016

how can I get consistent topic distributions? I know LDA uses randomness in training, but if I use setSeed shouldn't the topics and calculated topic distributions always be the same on each run?

val ldaParams: LDA = new LDA().setK(10).setMaxIterations(60).setSeed(10L)
val distributedLDAModel: DistributedLDAModel =[DistributedLDAModel]
val topicDistributions: Map[Long,Vector] = distributedLDAModel.topicDistributions.collect.toMap //produces different results on each run

I am training a LDA model on wikipedia articles(4 million docs, ~14GB data). I am running a scala script on one machine with ~98GB memory. I run the scala code in spark shell with following params:

$SPARK_HOME/bin/spark-shell --executor-memory 2G --driver-memory 25G --total-executor-cores 10 --conf spark.driver.maxResultSize=50g

I have scala code very similar to the above. No matter what driver memory, executor memory or maxResultSize memory I keep, I get OutOfMemory error or maxResultSize exceeded error

Could you please help me figure out the right settings?


Hi I have my data stored in a Sql Table. I scrapped a newpaper data and stored it ina sql table.
I am importing a column from of contents which contains the news article(content) and docID as

val data = sqlContext.sql("Select content,id from HT limit 2").toDF("text","docId").cache

data: org.apache.spark.sql.DataFrame = [text: string, docId: int]
|                text          | docId |
|The Panama Papers...|    1    |

Can someplease help me how can modify this as a standard inout for this algorithm

I have found the solution myself.
I just mapped the data and got the input in standard form .

val corpus ={x:Row => x.getAsString}
corpus: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[83] at map at :52.

Now I don't want to print the the final output . I want to save it in a dataFrame. So that I can store it as a table in Hive.

val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 5)
topicIndices.foreach { case (terms, termWeights) =>
  println("TOPIC:") { case (term, weight) =>
topicIndices: Array[(Array[Int], Array[Double])] = Array((Array(0, 2, 3, 1, 4),Array(0.00350628017263507, 0.0034694474589591524, 0.003438757757561852, 0.003436255047934638, 0.003163149677097968)), (Array(0, 3, 2, 1, 4),Array(0.0036033828372688545, 0.003475759951074189, 0.0034649895837134807, 0.003435509739061473, 0.0031197044381247384)), (Array(0, 2, 1, 3, 4),Array(0.0035777177552655955, 0.0035349785708481277, 0.003533272120787881, 0.003387443930605837, 0.003293978337255521)), (Array(0, 1, 2, 3, 4),Array(0.003582087659862847, 0.003542792277259787, 0.0034411804067727104, 0.003396808386547253, 0.0032547793346400836)), (Array(0, 3, 2, 1, 4),Array(0.003564513879334988, 0.0034774762675255965, 0.0034378278250957604, 0.0033565986686143186, 0.0030693802853424055)), (Array(0, 2, 3, 1, 4),Array(0...TOPIC:
would   0.00350628017263507
what    0.0034694474589591524
people  0.003438757757561852
india   0.003436255047934638
your    0.003163149677097968

Please help me to tweak this code to save this data rather than printing it on screen

brejy commented Jan 11, 2017

I'd like to train a LDA model to data from Twitter, I compiled this code until step
val lda = new LDA().setK(numTopics).setMaxIterations(10)
I get this error
error: reference to LDA is ambiguous;
it is imported twice in the same scope by import and import org.apache.spark.mllib.clustering.LDA
val lda = new LDA().setK(numTopics).setMaxIterations(10)
Could someone please help me ? I use spark 2.0.0
Thanks !

it seems that you all figured out how to deal with the Problem of similar (in my case even same) Topics. Maybe someone can help me, too?
My corpus exists of Tupel2<Long, Vector> where the Vector is a denseVector made of an Array with doubles.

Then I just tried the following:

LDAModel ldaModel = new LDA().setK(3).setMaxIterations(100).run(corpus);

final scala.Tuple2<int[],double[]>[] topics = ldaModel.describeTopics(v_AnzahlTerme);

List result2 = new java.util.ArrayList<>();
for (scala.Tuple2<int[],double[]> tmp : topics) {
for (int i=0; i<3;i++)
for (int j=0; j<tmp._1().length;j++)
result2.add(RowFactory.create(i, tmp._1()[j], tmp._2()[j]));


return sc.parallelize(result2);

Are there any obvious Errors?

Thank you for any help!

maynewong commented Aug 29, 2018

val corpus: RDD[String] = sc.wholeTextFiles("docs/*.md").map(_._2) get the topic words are same,
should change it toval corpus = sc.textFile("docs/*.md")

May I know how to change the word to some short phrase, such as public holiday, banking holiday and their weights accordingly. Shall we consider using n-gram.


