Skip to content

Instantly share code, notes, and snippets.

@fsarradin
Created February 15, 2016 10:30
Show Gist options
  • Save fsarradin/24d71255e7ace544d5fa to your computer and use it in GitHub Desktop.
Save fsarradin/24d71255e7ace544d5fa to your computer and use it in GitHub Desktop.
Reactive Kafka with Akka Stream
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}
import org.apache.kafka.common.serialization.StringDeserializer
object MainConsumer {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("TestKafka")
implicit val materializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val publisher = kafka.consume(ConsumerProperties(
bootstrapServers = "localhost:9092",
topic = "chat",
groupId = "g1",
valueDeserializer = new StringDeserializer
).readFromEndOfStream())
Source.fromPublisher(publisher)
.to(Sink.foreach(println))
.run()
}
}
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.actor.{ActorPublisher, ActorPublisherMessage}
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages.StringProducerMessage
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka}
import org.apache.kafka.common.serialization.StringSerializer
import org.reactivestreams.Subscriber
object MainProducer {
class StringPubliser extends ActorPublisher[String] {
override def receive = {
case ActorPublisherMessage.Request(_) => sendValues()
}
def sendValues(): Unit = {
var index: Int = 0
while (isActive && totalDemand > 0) {
onNext(s"value #$index")
index += 1
Thread.sleep(300)
}
}
}
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("TestKafka")
implicit val materializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val subscriber: Subscriber[StringProducerMessage] = kafka.publish(ProducerProperties(
bootstrapServers = "localhost:9092",
topic = "chat",
valueSerializer = new StringSerializer()
))
val stringPublisherActor: ActorRef = actorSystem.actorOf(Props(new StringPubliser))
Source.fromPublisher(ActorPublisher[String](stringPublisherActor))
.map(ProducerMessage(_))
.alsoTo(Sink.foreach(println))
.to(Sink.fromSubscriber(subscriber))
.run()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment