Skip to content

Instantly share code, notes, and snippets.

@stefanobaghino
Last active April 11, 2023 07:57
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save stefanobaghino/853c45f6355ad3a21b1f to your computer and use it in GitHub Desktop.
Save stefanobaghino/853c45f6355ad3a21b1f to your computer and use it in GitHub Desktop.
Pipe a Kafka consumer to a WebSocket on Play! Framework.
kafka {
consumer {
group.id = "default_consumer_group"
zookeeper.connect = "localhost:2181"
auto.offset.reset = "smallest"
consumer.timeout.ms = "-1"
}
}
package controllers
import java.util.Properties
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, Whitelist}
import kafka.serializer.StringDecoder
import play.api.libs.iteratee.{Enumerator, Iteratee}
import play.api.mvc.{Controller, WebSocket}
import scala.collection.JavaConversions._
object KafkaFeed extends Controller {
private val kafkaConsumerConfig =
new ConsumerConfig(ConfigFactory.load().getConfig("kafka.consumer").entrySet().foldRight(new Properties) {
(item, props) =>
props.setProperty(item.getKey, item.getValue.unwrapped().toString)
props
})
private def connect(config: ConsumerConfig) = Consumer.create(config)
private def consume(topic: String, connection: ConsumerConnector) =
connection.createMessageStreamsByFilter(new Whitelist(topic), 1, new StringDecoder, new StringDecoder).headOption.map(_.toStream)
def listenTo(topic: String) = WebSocket.using[String] { _ =>
val connection = connect(kafkaConsumerConfig)
var connected = true
val endOnDisconnection = Iteratee.foreach[String](println).map { _ =>
connection.shutdown()
connected = false
}
val pipeFromKafka = consume(topic, connection)
.map(Enumerator.unfold(_) { s => if (connected) Some(s.tail, s.head.message()) else None })
.getOrElse(Enumerator.eof[String])
endOnDisconnection -> pipeFromKafka
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment