Skip to content

Instantly share code, notes, and snippets.

@darkjh
Created January 22, 2016 14:49
Show Gist options
  • Save darkjh/fe1e5a5387bf13b4d4dd to your computer and use it in GitHub Desktop.
Save darkjh/fe1e5a5387bf13b4d4dd to your computer and use it in GitHub Desktop.
import java.util.concurrent.Executors
import java.util.{Properties, Collection}
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConversions._
class Consumer(threadId: String, topic: String, configs: Properties) extends Runnable {
// gracefully terminates the consumer thread
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
commitOffset()
}
})
val name = s"$topic-$threadId"
val listener = new ConsumerRebalanceListener {
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
println(s"$name got assigned ${partitions.toList}")
}
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
if (partitions.isEmpty) {
return
}
// flush local state
println(s"$name flush @ $count")
kafkaConsumer.commitSync()
println(s"$name committed offset for ${partitions.toList}")
}
}
// simulate a local state
var count = 0l
val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](configs)
kafkaConsumer.subscribe(List(topic), listener)
def run(): Unit = {
while (!Thread.currentThread().isInterrupted) {
try {
val records = kafkaConsumer.poll(100)
println(s"$name pulled ${records.size} msgs ...")
Thread.sleep(50)
for (msg <- records) {
count += 1
}
} catch {
case e: Exception =>
println("Error during processing of the message: " + e)
}
}
}
def commitOffset(): Unit = {
kafkaConsumer.commitSync()
}
}
object Test extends App {
val topic = "balance"
val props = new Properties()
props.put("zookeeper.connect", "localhost:2181")
props.put("group.id", "balance-test")
props.put("zookeeper.session.timeout.ms", "400")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
// kafka 0.9.0.0
props.put("bootstrap.servers", "localhost:9092")
props.put("auto.offset.reset", "earliest")
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
// disable auto commit
props.put("enable.auto.commit", "false")
// launch 2 consumers in this process
val executor = Executors.newFixedThreadPool(2)
executor.submit(new Consumer("0", topic, props))
executor.submit(new Consumer("1", topic, props))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment