Skip to content

Instantly share code, notes, and snippets.

@samuelgmartinez
Last active August 29, 2015 14:05
Show Gist options
  • Save samuelgmartinez/1b3588b739e60db3d492 to your computer and use it in GitHub Desktop.
Save samuelgmartinez/1b3588b739e60db3d492 to your computer and use it in GitHub Desktop.
Dummy Consumers rebalancing
package com.testing.kafka;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
/**
*
* @author samuelgmartinez
*/
public class ConsumerRebalanceTest {
private static final Logger log = Logger.getLogger(ConsumerRebalanceTest.class);
@Test
public void testRebalance() throws Exception {
int consumers = 2;
int tpoolSize = 2;
ExecutorService executor = Executors.newFixedThreadPool(tpoolSize * consumers);
List<Future<Result>> futures = new ArrayList<Future<Result>>(tpoolSize * consumers);
List<ConsumerConnector> connectors = new ArrayList<ConsumerConnector>(tpoolSize * consumers);
AtomicBoolean running = new AtomicBoolean(true);
for (int c = 1; c <= consumers; c++) {
// specify some consumer properties
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "mygroup2");
//Just for testing
props.put("consumer.id", "instance" + c);
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig);
connectors.add(connector);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams;
topicMessageStreams =
connector.createMessageStreams(ImmutableMap.of("multi1", new Integer(tpoolSize)));
for (int i = 1; i <= tpoolSize; i++) {
futures.add(executor.submit(
new MyConsumer("instance-" + c + "-" + i, topicMessageStreams.get("multi1").get(i - 1), running)));
}
//to be able to sleep here, the topic must have a bunch of consumable msgs
Thread.sleep(2000);
}
Thread.sleep(10000);
log.info("shutting down...");
running.set(false);
//test
for (ConsumerConnector connector : connectors) {
connector.commitOffsets();
connector.shutdown();
}
executor.shutdown();
Map<String, Result> results = new HashMap<String, Result>(consumers * tpoolSize);
for (Future<Result> future : futures) {
Result result = future.get();
results.put(result.id, result);
log.info(result.id + " consumed " + result.count);
}
//this must be true
Assert.assertTrue(results.get("instance-1-1").count > 0);
Assert.assertTrue(results.get("instance-1-2").count > 0);
//if the rebalance worked correctly, this consumer instance would have consumed a bunch of msgs
Assert.assertTrue(results.get("instance-2-1").count > 0 || results.get("instance-2-2").count > 0);
}
public static class MyConsumer implements Callable<Result> {
private final KafkaStream<byte[], byte[]> stream;
private final AtomicBoolean running;
private final String id;
public MyConsumer(String id, KafkaStream<byte[], byte[]> stream, AtomicBoolean running) {
this.stream = stream;
this.id = id;
this.running = running;
}
@Override
public Result call() throws Exception {
int i = 0;
log.trace(Thread.currentThread().getName() + " - " + this.id + " - starting to consume msgs");
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(running.get() && it.hasNext()) {
MessageAndMetadata<byte[], byte[]> msg = it.next();
i++;
if (i % 200 == 0) {
log.info(Thread.currentThread().getName()
+ " - " + this.id + " - " + i + " msgs consumed");
}
// log.info(Thread.currentThread().getName()
// + " - " + this.id + " - " + new String(msg.message()));
Thread.sleep(10);
}
log.trace(Thread.currentThread().getName() + " - " + this.id + " is over");
return new Result(id, i);
}
}
private static class Result {
String id;
int count;
public Result(String id, int count) {
this.id = id;
this.count = count;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment