Skip to content

Instantly share code, notes, and snippets.

@lucidfrontier45
Last active March 14, 2016 08:25
Show Gist options
  • Save lucidfrontier45/11420721c0078c5b7415 to your computer and use it in GitHub Desktop.
Save lucidfrontier45/11420721c0078c5b7415 to your computer and use it in GitHub Desktop.
LDA using Spark MLlib
def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
.flatMap {
// input file's format is (user_id, product_name, count)
case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
case _ => None
}.persist()
// Map to convert user_id or product_name into unique sequencential id
val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
val inverse_userid_map = userid_map.map(_.swap)
// broadcat to speedup RDD map operation
val b_userid_map = sc.broadcast(userid_map)
val b_productid_map = sc.broadcast(productid_map)
val b_inverse_userid_map = sc.broadcast(inverse_userid_map)
// run map
val transformed_src = src.map { case (u, p, r) =>
(b_userid_map.value(u), b_productid_map.value(p).toInt, r)
}
println("unique items = %d".format(b_productid_map.value.size))
// prepare for LDA input RDD[(LONG, Vector)]
val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
.groupByKey()
.map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()
documents.count()
src.unpersist()
// run Online Variational LDA
val ldamodel = new LDA()
.setK(args.k)
.setMaxIterations(args.n_iter)
.setOptimizer("online")
.run(documents)
.asInstanceOf[LocalLDAModel]
val result = ldamodel.topicDistributions(documents)
.map { case (i, v) =>
val u = b_inverse_userid_map.value(i)
"%d,%s".format(u, v.toArray.mkString(","))
}
result.saveAsTextFile(args.out)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment