Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@andreionut
Created January 27, 2016 17:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andreionut/98ff613e7bb1b20135eb to your computer and use it in GitHub Desktop.
Save andreionut/98ff613e7bb1b20135eb to your computer and use it in GitHub Desktop.
import _root_.kafka.serializer.{StringDecoder, StringEncoder}
import akka.actor.SupervisorStrategy.{Restart, Resume}
import akka.actor.{Props, OneForOneStrategy, SupervisorStrategy, ActorSystem}
import akka.stream.actor.ActorSubscriber
import akka.stream.{ActorMaterializerSettings, Supervision, ActorMaterializer}
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages.StringKafkaMessage
import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka}
import org.reactivestreams.{Publisher, Subscriber}
import scala.language.postfixOps
object Uppercase extends App {
implicit val actorSystem = ActorSystem("ReactiveKafka")
val decider: Supervision.Decider = {
case e: Throwable =>
println("Stream Supervision Decider to the rescue!!! (case Throwable)")
Supervision.Restart
case _ =>
println("Stream Supervision Decider to the rescue!!! (case _)")
Supervision.Restart
}
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
val kafka = new ReactiveKafka()
val publisher: Publisher[StringKafkaMessage] = kafka.consume(ConsumerProperties(
brokerList = "192.168.42.45:9092",
zooKeeperHost = "192.168.42.45:2181/kafka",
topic = "lowercaseStrings",
groupId = "groupName",
decoder = new StringDecoder()
))
// val subscriber: Subscriber[String] = kafka.publish(ProducerProperties(
// brokerList = "192.168.42.45:9092",
// topic = "uppercaseStrings",
// encoder = new StringEncoder()
// ))
val subscriber: Subscriber[String] = ???
Source.fromPublisher(publisher).map{ m =>
Thread.sleep(50)
if (m.message() == "b") throw new IllegalArgumentException("b encountered. We can't have that!")
println(m.message())
m.message().toUpperCase
}.to(Sink.fromSubscriber(subscriber)).run()
}
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.stream.ActorMaterializer
import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka}
class Handler extends Actor {
implicit val materializer = ActorMaterializer()
override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case exception =>
println("Actor Supervision Strategy to the rescue!!! (case exception)")
Restart // Your custom error handling
}
def createSupervisedSubscriberActor() = {
val kafka = new ReactiveKafka()
// subscriber
val subscriberProperties = ProducerProperties(
brokerList = "192.168.42.45:9092",
topic = "uppercaseStrings",
encoder = new StringEncoder()
)
val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties)
context.actorOf(subscriberActorProps)
}
override def receive: Receive = {
case actor => println("Stuff")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment