Skip to content

Instantly share code, notes, and snippets.

@mdespriee
Created June 29, 2017 19:13
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mdespriee/8ae604036732f39f6345ee91acf777a0 to your computer and use it in GitHub Desktop.
Save mdespriee/8ae604036732f39f6345ee91acf777a0 to your computer and use it in GitHub Desktop.
Example of how to build LDA incrementally in Spark, with comparison to one-shot learning.
// This code is related to PR https://github.com/apache/spark/pull/17461
// I show how to use the setInitialModel() param of LDA to build a model incrementally,
// and I compare the performance (perplexity) with a model built in one-shot
import scala.collection.mutable
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.clustering.{LDA, LDAModel}
import org.apache.spark.ml.feature._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// scalastyle:off println
object LDAIncrementalExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val dataset = spark.read.text("/home/mde/workspaces/spark-project/spark/docs/*md").toDF("doc")
.where(length($"doc") > 0)
println(s"Nb documents = ${dataset.count()}")
dataset.show()
// Let's prepare a test set for LDA perplexity eval throughout this example
val splits = dataset.randomSplit(Array(0.8, 0.2), 15L)
val (train, test) = (splits(0), splits(1))
// Build a LDA in one-shot
val vocabSize = 30
val k = 10
val iter = 30
println(s"One-Shot build, vocabSize=$vocabSize, k=$k, maxIterations=$iter")
// Prepare dataset : tokenize and build a vocabulary
val (dataprep, vocab) = buildDataPrepPipeline(train, vocabSize)
// Build a LDA
val vectorized = dataprep.transform(train)
val ldaModel = buildModel(vectorized, k, iter)
showTopics(spark, vocab, ldaModel)
// evaluate
val testVect = dataprep.transform(test)
val perplexity = ldaModel.logPerplexity(ldaModel.transform(testVect))
println(s"Perplexity=$perplexity")
println("---------------------------------")
println("")
// ---------------------------------------
// Build a LDA incrementally
// - we assume the same tokenisation, and that vocabulary is stable
// (we reuse the one previously built)
// - let's say the data will come incrementally, in 10 chunks
val nbChunks = 10
val chunks = train.randomSplit(Array.fill(nbChunks)(1D / nbChunks), 7L)
var ldaModelIncr: LDAModel = null
var idx = 0
for (chunk <- chunks) {
idx += 1
println(s"Incremental, chunk=$idx, vocabSize=$vocabSize, k=$k, maxIterations=$iter")
val chunkVect = dataprep.transform(chunk)
ldaModelIncr = buildModel(chunkVect, k, iter, ldaModelIncr)
showTopics(spark, vocab, ldaModelIncr)
val perplexity = ldaModelIncr.logPerplexity(testVect)
println(s"Perplexity=$perplexity")
println("---------------------------------")
}
spark.stop()
}
def buildDataPrepPipeline(dataset: DataFrame, vocabSize: Int): (PipelineModel, Array[String]) = {
val countTokens = udf { (words: Seq[String]) => words.length }
val stop = StopWordsRemover.loadDefaultStopWords("english") ++
Array("tr", "td", "div", "class", "table", "html", "div")
val tokenizer = new RegexTokenizer().setInputCol("doc").setOutputCol("rawwords")
.setGaps(false).setPattern("[a-zA-Z]{3,}")
val stopremover = new StopWordsRemover().setInputCol("rawwords")
.setOutputCol("words").setStopWords(stop)
val vectorizer = new CountVectorizer().setInputCol("words").setOutputCol("features")
.setVocabSize(vocabSize)
.setMinDF(2)
val stages = Array(
tokenizer, stopremover, vectorizer)
val pipeline = new Pipeline().setStages(stages)
val model = pipeline.fit(dataset)
(model, model.stages(2).asInstanceOf[CountVectorizerModel].vocabulary)
}
def buildModel(dataset: DataFrame, k: Int, maxIter: Int,
previousModel: LDAModel = null): LDAModel = {
val lda = new LDA()
.setK(k)
.setFeaturesCol("features")
.setMaxIter(maxIter)
.setOptimizer("online")
if (previousModel != null) {
lda.setInitialModel(previousModel)
}
lda.fit(dataset)
}
def showTopics(spark: SparkSession, vocab: Array[String], ldaModel: LDAModel): Unit = {
import spark.implicits._
val bc = spark.sparkContext.broadcast(vocab)
val topicWords = udf { (indices: mutable.WrappedArray[_]) =>
indices.map {
case v: Int => bc.value(v)
}
}
ldaModel.describeTopics().select(topicWords($"termIndices").as("topics")).show(false)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment