-
-
Save seacloud9/90c35ac3ab3c88a7c18e53437aab954a to your computer and use it in GitHub Desktop.
Pipe a Kafka consumer to a WebSocket on Play! Framework.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kafka { | |
consumer { | |
group.id = "default_consumer_group" | |
zookeeper.connect = "localhost:2181" | |
auto.offset.reset = "smallest" | |
consumer.timeout.ms = "-1" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import java.util.Properties | |
import com.typesafe.config.ConfigFactory | |
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, Whitelist} | |
import kafka.serializer.StringDecoder | |
import play.api.libs.iteratee.{Enumerator, Iteratee} | |
import play.api.mvc.{Controller, WebSocket} | |
import scala.collection.JavaConversions._ | |
object KafkaFeed extends Controller { | |
private val kafkaConsumerConfig = | |
new ConsumerConfig(ConfigFactory.load().getConfig("kafka.consumer").entrySet().foldRight(new Properties) { | |
(item, props) => | |
props.setProperty(item.getKey, item.getValue.unwrapped().toString) | |
props | |
}) | |
private def connect(config: ConsumerConfig) = Consumer.create(config) | |
private def consume(topic: String, connection: ConsumerConnector) = | |
connection.createMessageStreamsByFilter(new Whitelist(topic), 1, new StringDecoder, new StringDecoder).headOption.map(_.toStream) | |
def listenTo(topic: String) = WebSocket.using[String] { _ => | |
val connection = connect(kafkaConsumerConfig) | |
var connected = true | |
val endOnDisconnection = Iteratee.foreach[String](println).map { _ => | |
connection.shutdown() | |
connected = false | |
} | |
val pipeFromKafka = consume(topic, connection) | |
.map(Enumerator.unfold(_) { s => if (connected) Some(s.tail, s.head.message()) else None }) | |
.getOrElse(Enumerator.eof[String]) | |
endOnDisconnection -> pipeFromKafka | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment