Skip to content

Instantly share code, notes, and snippets.

@darkjh
Last active February 1, 2016 15:38
Show Gist options
  • Save darkjh/437ac72cdd4b1c4ca2e7 to your computer and use it in GitHub Desktop.
Save darkjh/437ac72cdd4b1c4ca2e7 to your computer and use it in GitHub Desktop.
import java.util.concurrent.{TimeUnit, Executors}
import java.util.{Collection, Properties}
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.WakeupException
import scala.collection.JavaConversions._
class Consumer(threadId: String, topic: String, configs: Properties) extends Runnable {
val name = s"$topic-$threadId"
val listener = new ConsumerRebalanceListener {
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
println(s"$name got assigned ${partitions.toList}")
}
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
if (partitions.isEmpty) {
return
}
kafkaConsumer.commitSync()
println(s"$name committed offset for ${partitions.toList}")
}
}
// flush and commit on every 80000 msgs
val flushNumMsg = 80000
// simulate a local state
var count = 0l
val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](configs)
kafkaConsumer.subscribe(List(topic), listener)
def run(): Unit = {
try {
while (true) {
try {
val records = kafkaConsumer.poll(1000)
Thread.sleep(20)
for (msg <- records) {
count += 1
if (count % flushNumMsg == 0) {
commitOffset()
}
if (count % 5000 == 0) {
println(s"$name fetched $count ...")
}
}
} catch {
case e: WakeupException =>
throw e
case e: Exception =>
println("Error during processing of the message: " + e)
}
}
} catch {
case e: WakeupException =>
// ignore for shutdown
} finally {
commitOffset()
kafkaConsumer.close()
println(count)
import java.io.{File, PrintWriter}
val p = new PrintWriter(new File(s"/tmp/track/${Thread.currentThread().getName}"))
p.write(s"$count\n")
p.close()
}
}
def commitOffset(): Unit = {
try {
kafkaConsumer.commitSync()
} catch {
case e: WakeupException =>
// retry the commit, then rethrow for shutdown
kafkaConsumer.commitSync()
throw e
}
}
}
object Test extends App {
val topic = "continue"
val groupId = s"$topic-test"
val props = new Properties()
props.put("zookeeper.connect", "localhost:2181")
props.put("group.id", groupId)
props.put("zookeeper.session.timeout.ms", "400")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
// kafka 0.9.0.0
props.put("bootstrap.servers", "localhost:9092")
props.put("auto.offset.reset", "earliest")
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
// disable auto commit
props.put("enable.auto.commit", "false")
// launch 2 consumers in this process
val executor = Executors.newFixedThreadPool(2)
val consumers = List(
new Consumer("0", topic, props),
new Consumer("1", topic, props)
)
consumers.foreach(executor.submit(_))
// gracefully terminates the consumer thread
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
consumers.foreach(_.kafkaConsumer.wakeup())
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
}
})
}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import com.typesafe.scalalogging.StrictLogging
import java.util.{HashMap, Map => JMap, Properties, UUID}
object TestProducer extends App {
private class Producer(configs: JMap[String, Object] = new HashMap())
extends KafkaProducer(configs, new StringSerializer, new StringSerializer)
with StrictLogging {
val limit = 1200000
def insert(): Unit = {
var count = 0
while (count < limit) {
val msg = new ProducerRecord[String, String]("continue", null, "yo")
send(msg)
count += 1
if (count % 500 == 0) {
logger.info(s"Sent $count msgs ...")
// just to slow it down to be controllable
Thread.sleep(50)
}
}
logger.info(s"Sent $count msgs ...")
}
}
val config = new HashMap[String, Object]()
config.put("bootstrap.servers", "localhost:9092")
new Producer(config).insert()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment