Last active
March 14, 2016 08:25
-
-
Save lucidfrontier45/11420721c0078c5b7415 to your computer and use it in GitHub Desktop.
LDA using Spark MLlib
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = { | |
val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t")) | |
.flatMap { | |
// input file's format is (user_id, product_name, count) | |
case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble)) | |
case _ => None | |
}.persist() | |
// Map to convert user_id or product_name into unique sequencential id | |
val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap | |
val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap | |
val inverse_userid_map = userid_map.map(_.swap) | |
// broadcat to speedup RDD map operation | |
val b_userid_map = sc.broadcast(userid_map) | |
val b_productid_map = sc.broadcast(productid_map) | |
val b_inverse_userid_map = sc.broadcast(inverse_userid_map) | |
// run map | |
val transformed_src = src.map { case (u, p, r) => | |
(b_userid_map.value(u), b_productid_map.value(p).toInt, r) | |
} | |
println("unique items = %d".format(b_productid_map.value.size)) | |
// prepare for LDA input RDD[(LONG, Vector)] | |
val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) } | |
.groupByKey() | |
.map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist() | |
documents.count() | |
src.unpersist() | |
// run Online Variational LDA | |
val ldamodel = new LDA() | |
.setK(args.k) | |
.setMaxIterations(args.n_iter) | |
.setOptimizer("online") | |
.run(documents) | |
.asInstanceOf[LocalLDAModel] | |
val result = ldamodel.topicDistributions(documents) | |
.map { case (i, v) => | |
val u = b_inverse_userid_map.value(i) | |
"%d,%s".format(u, v.toArray.mkString(",")) | |
} | |
result.saveAsTextFile(args.out) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment