Skip to content

Instantly share code, notes, and snippets.

@mardambey
Last active August 3, 2019 05:03
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save mardambey/2564040 to your computer and use it in GitHub Desktop.
Save mardambey/2564040 to your computer and use it in GitHub Desktop.
Akka 2.0 actors with Kafka 0.7.x backed durable mailboxes.
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.agent.Agent
import com.typesafe.config.ConfigFactory
import akka.event.Logging
import akka.actor.Props
import kafka.utils.Utils
import java.nio.ByteBuffer
object AkkaKafkaMailboxTest extends App {
implicit val system = ActorSystem("KafkaAkkaSystem")
val kafka = system.actorOf(Props[KafkaActor].withDispatcher("kafka-dispatcher"), name = "TestActor")
var loop = Agent(true);
sys.addShutdownHook({
loop send false
system.stop(kafka)
println("Shutting down...")
})
while(loop()) {
kafka ! ("Testing 1 2 3 @ " + System.currentTimeMillis() / 1000)
Thread.sleep(1000)
}
}
class KafkaActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case msg:Array[Byte] => {
println("Got msg: " + new String(msg, "UTF-8"))
}
case msg => {
println("Got unknown type msg: " + msg)
}
}
override def postStop() = {
log.info("Shutting down Kafka actor...")
}
}
kafka-dispatcher {
mailbox-type = "akka.actor.mailbox.KafkaBasedMailboxType"
throughput = 1
}
akka {
actor {
mailbox {
kafka {
zkCluster = "localhost:2181"
}
}
}
loglevel = INFO
}
package akka.actor.mailbox
import akka.AkkaException
import akka.actor.ActorContext
import akka.dispatch.Envelope
import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.NonFatal
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
import akka.actor.ActorSystem
import akka.actor.mailbox.DurableMessageSerialization
import akka.actor.mailbox.DurableMessageQueue
import kafka.consumer.ConsumerConnector
import java.util.Properties
import kafka.message.ByteBufferMessageSet
import kafka.message.Message
import kafka.message.NoCompressionCodec
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer._
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.ProducerData
import kafka.utils.Utils
import kafka.consumer.ConsumerConnector
import akka.remote.MessageSerializer
import akka.serialization.SerializationExtension
class KafkaBasedMailboxException(message: String) extends AkkaException(message)
class KafkaBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
private val settings = new KafkaBasedMailboxSettings(systemSettings, config)
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) ⇒ new KafkaBasedMessageQueue(o, settings)
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class KafkaBasedMessageQueue(_owner: ActorContext, val settings: KafkaBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private var consumer = createConsumer()
private var producer = createProducer()
private var consumerIter = consumer.get.iterator
private var consumerConnector:ConsumerConnector = _
val log = Logging(system, "KafkaBasedMessageQueue")
def enqueue(receiver: ActorRef, envelope: Envelope) {
withErrorHandling {
if (producer != None) {
try{
val msg = serialize(envelope)
producer.get.send(new ProducerData(name,new Message(msg)))
}catch{
case e: Exception => { log.warning("Unable to publish from producer.")}
}
}
}
}
def dequeue(): Envelope = withErrorHandling {
try {
try {
val item = consumerIter.next()
val bytes = new Array[Byte](item.payload.capacity())
item.payload.get(bytes)
consumerConnector.commitOffsets
deserialize(bytes)
} catch {
case _ => throw new NoSuchElementException("Could not get element from Kafka.")
}
} catch {
case e: java.util.NoSuchElementException ⇒ null
case NonFatal(e) ⇒
log.error(e, "Couldn't dequeue from Kafka-based mailbox")
throw e
}
}
def numberOfMessages: Int = withErrorHandling {
try {
consumerIter.hasNext() match {
case true => 1
case _ => 0
}
} catch {
case _ => 0
}
}
def hasMessages: Boolean = numberOfMessages > 0
def getConsumerConnector(groupId:String) : ConsumerConnector = {
val consumerProps = new Properties
consumerProps.put("zk.connect", settings.zkCluster)
consumerProps.put("zk.connectiontimeout.ms", "1000000")
consumerProps.put("groupid", groupId)
val consumerConfig = new ConsumerConfig(consumerProps)
create(consumerConfig)
}
def createProducer() = {
val producerProps = new Properties()
producerProps.put("zk.connect", settings.zkCluster)
val producerConfig = new ProducerConfig(producerProps)
val producer = try {
Some(new Producer[String, Message](producerConfig))
} catch {
case e:Exception => None
}
producer
}
def createConsumer() = {
val topicThreadCount = Map((name, 1))
consumerConnector = getConsumerConnector(name)
val topicMessageStreams = consumerConnector.createMessageStreams(topicThreadCount)
val streams = topicMessageStreams.get(name)
streams match {
case Some(List(stream)) => Some(stream)
case _ => {
log.error("Did not get a valid stream from topic " + name)
None
}
}
}
private def withErrorHandling[T](body: ⇒ T): T = {
try {
body
} catch {
case e: Exception ⇒ {
consumer = createConsumer()
producer = createProducer()
body
}
case NonFatal(e) ⇒
val error = new KafkaBasedMailboxException("Could not connect to Kafka, due to: " + e.getMessage)
log.error(error, error.getMessage)
throw error
}
}
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
consumerConnector.commitOffsets
}
}
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.actor.mailbox.DurableMailboxSettings
class KafkaBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
extends DurableMailboxSettings {
def name = "kafka"
val config = initialize
import config._
val zkCluster = getString("zkCluster")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment