Last active
February 1, 2016 15:38
-
-
Save darkjh/437ac72cdd4b1c4ca2e7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.{TimeUnit, Executors} | |
import java.util.{Collection, Properties} | |
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} | |
import org.apache.kafka.common.TopicPartition | |
import org.apache.kafka.common.errors.WakeupException | |
import scala.collection.JavaConversions._ | |
class Consumer(threadId: String, topic: String, configs: Properties) extends Runnable { | |
val name = s"$topic-$threadId" | |
val listener = new ConsumerRebalanceListener { | |
def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = { | |
println(s"$name got assigned ${partitions.toList}") | |
} | |
def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = { | |
if (partitions.isEmpty) { | |
return | |
} | |
kafkaConsumer.commitSync() | |
println(s"$name committed offset for ${partitions.toList}") | |
} | |
} | |
// flush and commit on every 80000 msgs | |
val flushNumMsg = 80000 | |
// simulate a local state | |
var count = 0l | |
val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](configs) | |
kafkaConsumer.subscribe(List(topic), listener) | |
def run(): Unit = { | |
try { | |
while (true) { | |
try { | |
val records = kafkaConsumer.poll(1000) | |
Thread.sleep(20) | |
for (msg <- records) { | |
count += 1 | |
if (count % flushNumMsg == 0) { | |
commitOffset() | |
} | |
if (count % 5000 == 0) { | |
println(s"$name fetched $count ...") | |
} | |
} | |
} catch { | |
case e: WakeupException => | |
throw e | |
case e: Exception => | |
println("Error during processing of the message: " + e) | |
} | |
} | |
} catch { | |
case e: WakeupException => | |
// ignore for shutdown | |
} finally { | |
commitOffset() | |
kafkaConsumer.close() | |
println(count) | |
import java.io.{File, PrintWriter} | |
val p = new PrintWriter(new File(s"/tmp/track/${Thread.currentThread().getName}")) | |
p.write(s"$count\n") | |
p.close() | |
} | |
} | |
def commitOffset(): Unit = { | |
try { | |
kafkaConsumer.commitSync() | |
} catch { | |
case e: WakeupException => | |
// retry the commit, then rethrow for shutdown | |
kafkaConsumer.commitSync() | |
throw e | |
} | |
} | |
} | |
object Test extends App { | |
val topic = "continue" | |
val groupId = s"$topic-test" | |
val props = new Properties() | |
props.put("zookeeper.connect", "localhost:2181") | |
props.put("group.id", groupId) | |
props.put("zookeeper.session.timeout.ms", "400") | |
props.put("zookeeper.sync.time.ms", "200") | |
props.put("auto.commit.interval.ms", "1000") | |
// kafka 0.9.0.0 | |
props.put("bootstrap.servers", "localhost:9092") | |
props.put("auto.offset.reset", "earliest") | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") | |
// disable auto commit | |
props.put("enable.auto.commit", "false") | |
// launch 2 consumers in this process | |
val executor = Executors.newFixedThreadPool(2) | |
val consumers = List( | |
new Consumer("0", topic, props), | |
new Consumer("1", topic, props) | |
) | |
consumers.foreach(executor.submit(_)) | |
// gracefully terminates the consumer thread | |
Runtime.getRuntime.addShutdownHook(new Thread() { | |
override def run(): Unit = { | |
consumers.foreach(_.kafkaConsumer.wakeup()) | |
executor.shutdown() | |
executor.awaitTermination(10, TimeUnit.SECONDS) | |
} | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} | |
import org.apache.kafka.common.serialization.StringSerializer | |
import com.typesafe.scalalogging.StrictLogging | |
import java.util.{HashMap, Map => JMap, Properties, UUID} | |
object TestProducer extends App { | |
private class Producer(configs: JMap[String, Object] = new HashMap()) | |
extends KafkaProducer(configs, new StringSerializer, new StringSerializer) | |
with StrictLogging { | |
val limit = 1200000 | |
def insert(): Unit = { | |
var count = 0 | |
while (count < limit) { | |
val msg = new ProducerRecord[String, String]("continue", null, "yo") | |
send(msg) | |
count += 1 | |
if (count % 500 == 0) { | |
logger.info(s"Sent $count msgs ...") | |
// just to slow it down to be controllable | |
Thread.sleep(50) | |
} | |
} | |
logger.info(s"Sent $count msgs ...") | |
} | |
} | |
val config = new HashMap[String, Object]() | |
config.put("bootstrap.servers", "localhost:9092") | |
new Producer(config).insert() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment