Created
February 15, 2016 10:30
-
-
Save fsarradin/24d71255e7ace544d5fa to your computer and use it in GitHub Desktop.
Reactive Kafka with Akka Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Sink, Source} | |
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka} | |
import org.apache.kafka.common.serialization.StringDeserializer | |
object MainConsumer { | |
def main(args: Array[String]): Unit = { | |
implicit val actorSystem = ActorSystem("TestKafka") | |
implicit val materializer = ActorMaterializer() | |
val kafka = new ReactiveKafka() | |
val publisher = kafka.consume(ConsumerProperties( | |
bootstrapServers = "localhost:9092", | |
topic = "chat", | |
groupId = "g1", | |
valueDeserializer = new StringDeserializer | |
).readFromEndOfStream()) | |
Source.fromPublisher(publisher) | |
.to(Sink.foreach(println)) | |
.run() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.stream.ActorMaterializer | |
import akka.stream.actor.{ActorPublisher, ActorPublisherMessage} | |
import akka.stream.scaladsl.{Sink, Source} | |
import com.softwaremill.react.kafka.KafkaMessages.StringProducerMessage | |
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka} | |
import org.apache.kafka.common.serialization.StringSerializer | |
import org.reactivestreams.Subscriber | |
object MainProducer { | |
class StringPubliser extends ActorPublisher[String] { | |
override def receive = { | |
case ActorPublisherMessage.Request(_) => sendValues() | |
} | |
def sendValues(): Unit = { | |
var index: Int = 0 | |
while (isActive && totalDemand > 0) { | |
onNext(s"value #$index") | |
index += 1 | |
Thread.sleep(300) | |
} | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
implicit val actorSystem = ActorSystem("TestKafka") | |
implicit val materializer = ActorMaterializer() | |
val kafka = new ReactiveKafka() | |
val subscriber: Subscriber[StringProducerMessage] = kafka.publish(ProducerProperties( | |
bootstrapServers = "localhost:9092", | |
topic = "chat", | |
valueSerializer = new StringSerializer() | |
)) | |
val stringPublisherActor: ActorRef = actorSystem.actorOf(Props(new StringPubliser)) | |
Source.fromPublisher(ActorPublisher[String](stringPublisherActor)) | |
.map(ProducerMessage(_)) | |
.alsoTo(Sink.foreach(println)) | |
.to(Sink.fromSubscriber(subscriber)) | |
.run() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment