Skip to content

Instantly share code, notes, and snippets.

@mrwillis
Created June 2, 2018 21:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrwillis/e7948882d8d6b8fa78e7f86ace71d11a to your computer and use it in GitHub Desktop.
Save mrwillis/e7948882d8d6b8fa78e7f86ace71d11a to your computer and use it in GitHub Desktop.
Kafka BroadcastHub (reactive-kafka)
import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, KafkaConsumerActor, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BroadcastHub, Keep, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
object Main {
implicit val system = ActorSystem("System")
implicit val mat = ActorMaterializer()
def main(args: Array[String]): Unit = {
val consumerSettings =
ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(scala.util.Random.alphanumeric take 10 mkString)
val kafkaConsumerActor =
system.actorOf(KafkaConsumerActor.props(consumerSettings))
val producer
: Source[ConsumerRecord[String, String], Consumer.Control] =
Consumer.plainExternalSource[String, String](
kafkaConsumerActor,
Subscriptions.assignment(
new TopicPartition("topic", 0))
)
val hub
: RunnableGraph[Source[ConsumerRecord[String, String], NotUsed]] =
producer.toMat(BroadcastHub.sink)(Keep.right)
val source: Source[ConsumerRecord[String, String], NotUsed] =
hub.run()
source
.runWith(Sink.foreach(data => println("Consumer 1" + data.value)))
source
.runWith(Sink.foreach(data => println("Consumer 2" + data.value)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment