Skip to content

Instantly share code, notes, and snippets.

@migue
Created November 18, 2014 08:26
Show Gist options
  • Save migue/0b93d0b9827be0659e30 to your computer and use it in GitHub Desktop.
Save migue/0b93d0b9827be0659e30 to your computer and use it in GitHub Desktop.
Liferay message boards classifier
val ssc = new StreamingContext(conf, Seconds(5))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val model = new KMeansModel(ssc.sparkContext.objectFile[Vector](modelFile.toString).collect())
val filteredMessages = kafkaStream
.filter(m => model.predict(Utils.featurize(m)) == clusterNumber.toInt)
filteredMessages.print()
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment