Skip to content

Instantly share code, notes, and snippets.

@gzoller
Last active April 16, 2016 18:57
Show Gist options
  • Save gzoller/93fe2392fd3606bcb3b879e4ab2f8f6e to your computer and use it in GitHub Desktop.
Save gzoller/93fe2392fd3606bcb3b879e4ab2f8f6e to your computer and use it in GitHub Desktop.
Kafka 0.9.0.1 Consumer
package com.cof.kafka
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, Deserializer, StringDeserializer }
import java.util.concurrent.LinkedBlockingQueue
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.{ ConsumerRebalanceListener, OffsetCommitCallback, ConsumerRecord, KafkaConsumer, OffsetAndMetadata }
import scala.concurrent.Promise
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.concurrent.Await
import akka.stream.scaladsl.Source
import java.util.Collection
import scala.collection.mutable.{ Map => MMap }
case class KafkaThread[V](
host: String,
groupId: String,
topic: String,
deserializer: Deserializer[V],
properties: Map[String, String]
) extends Runnable {
private val q = new LinkedBlockingQueue[AnyRef]()
private var running = true
def run() {
println("Bootstrap: " + host)
val consumer = new KafkaConsumer[Array[Byte], V](
(properties ++ Map(
"bootstrap.servers" -> host,
"enable.auto.commit" -> "false",
"auto.commit.interval.ms" -> "1000",
"auto.offset.reset" -> "earliest",
"group.id" -> groupId
)),
new ByteArrayDeserializer,
deserializer
)
consumer.subscribe(List(topic))
while (running) {
// Polling my blocking queue...not Kafka here
q.poll(100, TimeUnit.MILLISECONDS) match {
// Commit a record
case cr: ConsumerRecord[_, _] =>
val off = cr.offset() + 1
val offsets =
// commitSync() is slow but works w/no issues
// consumer.commitSync(offsets)
// commitAsync() is fast but is dropping commits!
consumer.commitAsync(java.util.Collections.singletonMap(
new TopicPartition(cr.topic(), cr.partition()),
new OffsetAndMetadata(off)
), new OffsetCommitCallback {
def onComplete(offsets: java.util.Map[TopicPartition, OffsetAndMetadata], ex: Exception) {
if (ex != null) {
print(s"ERROR [$offsets]: ")
ex.printStackTrace()
}
}
})
// Request the next iterator of messages from Kafka (consumer.poll())
case p: Promise[_] =>
p.asInstanceOf[Promise[Iterator[ConsumerRecord[Array[Byte], V]]]].success(consumer.poll(100).iterator)
case null => // do nothing...try again
}
}
println("::: Consumer Stopped :::")
consumer.close()
}
def !(a: AnyRef) = q.add(a)
def stop() = {
running = false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment