Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple java junit test of an apache kafka producer (works with Kafka 0.11.0.2) (see also https://github.com/asmaier/mini-kafka)
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.kafka.common.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import static org.junit.Assert.*;
/**
* For online documentation
* see
* https://github.com/apache/kafka/blob/0.10.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/admin/AdminUtils.scala
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/utils/ZkUtils.scala
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
*/
public class KafkaProducerIT {
private static final String ZKHOST = "127.0.0.1";
private static final String BROKERHOST = "127.0.0.1";
private static final String BROKERPORT = "9092";
private static final String TOPIC = "test";
@Test
public void producerTest() throws InterruptedException, IOException {
// setup Zookeeper
EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
String zkConnect = ZKHOST + ":" + zkServer.port();
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
// setup Broker
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT);
brokerProps.setProperty("offsets.topic.replication.factor" , "1");
KafkaConfig config = new KafkaConfig(brokerProps);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
// create topic
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
// setup producer
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);
// setup consumer
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty("group.id", "group0");
consumerProps.setProperty("client.id", "consumer0");
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(TOPIC));
// send message
ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8));
producer.send(data);
producer.close();
// starting consumer
ConsumerRecords<Integer, byte[]> records = consumer.poll(5000);
assertEquals(1, records.count());
Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
ConsumerRecord<Integer, byte[]> record = recordIterator.next();
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
assertEquals(42, (int) record.key());
assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
}
@fjavieralba
Copy link

fjavieralba commented Dec 9, 2013

hi,

what version of kafka are you using? I cannot compile this code with Kafka 0.8

@machielg
Copy link

machielg commented Jun 9, 2014

Great, thanks.

@lakshmikr
Copy link

lakshmikr commented Nov 10, 2014

Hi,
I am using kafka 0.8.1.1 and i am also not able to compile. Give the correct version of Kafka and also the corresponding POM Configuration.

@ViniciusMiana
Copy link

ViniciusMiana commented Nov 25, 2014

You need to use the classifier test with kafka. In gradle:

compile("org.apache.kafka:kafka_2.10:0.8.1.1"){
        exclude group: "com.sun.jmx", module: "jmxri"
        exclude group: "com.sun.jdmk", module: "jmxtools"
        exclude group: "javax.jms", module: "jms"

    }
    testCompile "com.101tec:zkclient:0.4"
    testCompile ("org.apache.kafka:kafka_2.10:0.8.1.1:test") {
        exclude group: "com.sun.jmx", module: "jmxri"
        exclude group: "com.sun.jdmk", module: "jmxtools"
        exclude group: "javax.jms", module: "jms"   
    }

@ViniciusMiana
Copy link

ViniciusMiana commented Nov 25, 2014

And fix this part:

   // create topic
      AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());


    List<KafkaServer> servers = new ArrayList<KafkaServer>();
    servers.add(kafkaServer);
    TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), TOPIC, 0, 5000);

@alexnederlof
Copy link

alexnederlof commented Mar 1, 2015

Or look at how others use the test version by doing this github search

@curtisallen
Copy link

curtisallen commented Jun 30, 2015

Thanks for this!
you can now create a topic by

// create topic
AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());

@asmaier
Copy link
Author

asmaier commented Nov 20, 2015

I updated the example right now to work with Kafka 0.8.2.2. As dependencies I use

<dependencies>
<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.12</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.2</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.2</version>
  <classifier>test</classifier>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.8.2.2</version>
</dependency>
</dependencies>

To see more output you can also add a log4j.properties file to your test resources with the following content

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F#%M:%L) - %m%n

log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
log4j.logger.org.apache.zookeeper=WARN
log4j.logger.kafka=INFO

@asmaier
Copy link
Author

asmaier commented Nov 23, 2015

I added some code to also consume the message sent.

@jonrodriguezb
Copy link

jonrodriguezb commented Dec 15, 2015

Hi,

I'm trying to run this example but I can't. I'm getting the following error: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

It seems that something is happening with Scala.. but I don't know how to fix it. The error comes up in the line of the MockTime.

This is my pom.xml file (dependencies):

org.apache.kafka kafka_2.10 0.8.2.2 junit junit 4.4

Help me please!

@adeel-shahzad
Copy link

adeel-shahzad commented Feb 16, 2016

perfect

@stanley-shi
Copy link

stanley-shi commented Mar 22, 2016

Thanks, this is really helpful!

@asmaier
Copy link
Author

asmaier commented Jul 1, 2016

I updated the example right now to work with Kafka 0.10.0.0 using the new producer and consumer API. As dependencies I use now

<dependencies>
 <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
 </dependency>
 <dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.10.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.10.0.0</version>
  <classifier>test</classifier>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.0</version>
  <classifier>test</classifier>
  <scope>test</scope>
</dependency>
</dependencies>

@topicscout
Copy link

topicscout commented Jul 1, 2016

Hi,

I just the 0.10.0.0.0 version recently posted. It compiled and ran under JK8 on a Mac. Ran tests within Intellij.
Test passed. But there were some logged exceptions, and I would just like to know if those exceptions are expected
as part of the test or not.

And thanks again for this! Much needed!

regards,

Stefan

[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:host.name=192.168.2.227
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.version=1.8.0_91
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/81/zg01zvv52p79rfzbfxcdkhym0000gn/T/
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Mac OS X
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=x86_64
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=10.11.1
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=xxxxxxx
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=xxxxxxxxxxxxxx
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/Users/xxxxxxx/dev
[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:64411 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@2641e737
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64411. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:64411, initiating session
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:64413
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:64413
[SyncThread:0] INFO org.apache.zookeeper.server.persistence.FileTxnLog - Creating new log file: log.1
[SyncThread:0] INFO org.apache.zookeeper.server.ZooKeeperServer - Established session 0x155a7c9cb2e0000 with negotiated timeout 10000 for client /127.0.0.1:64413
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:64411, sessionid = 0x155a7c9cb2e0000, negotiated timeout = 10000
[main] INFO kafka.server.KafkaConfig - KafkaConfig values:
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
log.flush.interval.messages = 9223372036854775807
auto.create.topics.enable = true
controller.socket.timeout.ms = 30000
log.flush.interval.ms = null
principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
replica.socket.receive.buffer.bytes = 65536
min.insync.replicas = 1
replica.fetch.wait.max.ms = 500
num.recovery.threads.per.data.dir = 1
ssl.keystore.type = JKS
sasl.mechanism.inter.broker.protocol = GSSAPI
default.replication.factor = 1
ssl.truststore.password = null
log.preallocate = false
sasl.kerberos.principal.to.local.rules = [DEFAULT]
fetch.purgatory.purge.interval.requests = 1000
ssl.endpoint.identification.algorithm = null
replica.socket.timeout.ms = 30000
message.max.bytes = 1000012
num.io.threads = 8
offsets.commit.required.acks = -1
log.flush.offset.checkpoint.interval.ms = 60000
delete.topic.enable = false
quota.window.size.seconds = 1
ssl.truststore.type = JKS
offsets.commit.timeout.ms = 5000
quota.window.num = 11
zookeeper.connect = 127.0.0.1:64411
authorizer.class.name =
num.replica.fetchers = 1
log.retention.ms = null
log.roll.jitter.hours = 0
log.cleaner.enable = true
offsets.load.buffer.size = 5242880
log.cleaner.delete.retention.ms = 86400000
ssl.client.auth = none
controlled.shutdown.max.retries = 3
queued.max.requests = 500
offsets.topic.replication.factor = 3
log.cleaner.threads = 1
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
socket.request.max.bytes = 104857600
ssl.trustmanager.algorithm = PKIX
zookeeper.session.timeout.ms = 6000
log.retention.bytes = -1
log.message.timestamp.type = CreateTime
sasl.kerberos.min.time.before.relogin = 60000
zookeeper.set.acl = false
connections.max.idle.ms = 600000
offsets.retention.minutes = 1440
replica.fetch.backoff.ms = 1000
inter.broker.protocol.version = 0.10.0-IV1
log.retention.hours = 168
num.partitions = 1
broker.id.generation.enable = true
listeners = PLAINTEXT://127.0.0.1:9092
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
log.roll.ms = null
log.flush.scheduler.interval.ms = 9223372036854775807
ssl.cipher.suites = null
log.index.size.max.bytes = 10485760
ssl.keymanager.algorithm = SunX509
security.inter.broker.protocol = PLAINTEXT
replica.fetch.max.bytes = 1048576
advertised.port = null
log.cleaner.dedupe.buffer.size = 134217728
replica.high.watermark.checkpoint.interval.ms = 5000
log.cleaner.io.buffer.size = 524288
sasl.kerberos.ticket.renew.window.factor = 0.8
zookeeper.connection.timeout.ms = null
controlled.shutdown.retry.backoff.ms = 5000
log.roll.hours = 168
log.cleanup.policy = delete
host.name =
log.roll.jitter.ms = null
max.connections.per.ip = 2147483647
offsets.topic.segment.bytes = 104857600
background.threads = 10
quota.consumer.default = 9223372036854775807
request.timeout.ms = 30000
log.message.format.version = 0.10.0-IV1
log.index.interval.bytes = 4096
log.dir = /tmp/kafka-logs
log.segment.bytes = 1073741824
log.cleaner.backoff.ms = 15000
offset.metadata.max.bytes = 4096
ssl.truststore.location = null
group.max.session.timeout.ms = 300000
ssl.keystore.password = null
zookeeper.sync.time.ms = 2000
port = 9092
log.retention.minutes = null
log.segment.delete.delay.ms = 60000
log.dirs = /var/folders/81/zg01zvv52p79rfzbfxcdkhym0000gn/T/kafka-7807231158003385273
controlled.shutdown.enable = true
compression.type = producer
max.connections.per.ip.overrides =
log.message.timestamp.difference.max.ms = 9223372036854775807
sasl.kerberos.kinit.cmd = /usr/bin/kinit
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.cleaner.min.cleanable.ratio = 0.5
replica.lag.time.max.ms = 10000
num.network.threads = 3
ssl.key.password = null
reserved.broker.max.id = 1000
metrics.num.samples = 2
socket.send.buffer.bytes = 102400
ssl.protocol = TLS
socket.receive.buffer.bytes = 102400
ssl.keystore.location = null
replica.fetch.min.bytes = 1
broker.rack = null
unclean.leader.election.enable = true
sasl.enabled.mechanisms = [GSSAPI]
group.min.session.timeout.ms = 6000
log.cleaner.io.buffer.load.factor = 0.9
offsets.retention.check.interval.ms = 600000
producer.purgatory.purge.interval.requests = 1000
metrics.sample.window.ms = 30000
broker.id = 0
offsets.topic.compression.codec = 0
log.retention.check.interval.ms = 300000
advertised.listeners = null
leader.imbalance.per.broker.percentage = 10

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:64411 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@15ca7889
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64411. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:64411, initiating session
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:64414
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:64414
[SyncThread:0] INFO org.apache.zookeeper.server.ZooKeeperServer - Established session 0x155a7c9cb2e0001 with negotiated timeout 6000 for client /127.0.0.1:64414
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:64411, sessionid = 0x155a7c9cb2e0001, negotiated timeout = 6000
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xb zxid:0x8 txntype:-1 reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x13 zxid:0xd txntype:-1 reqpath:n/a Error Path:/admin Error:KeeperErrorCode = NoNode for /admin
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:setData cxid:0x21 zxid:0x13 txntype:-1 reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:delete cxid:0x30 zxid:0x15 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x37 zxid:0x16 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x38 zxid:0x17 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0000 type:setData cxid:0x4 zxid:0x19 txntype:-1 reqpath:n/a Error Path:/config/topics/test Error:KeeperErrorCode = NoNode for /config/topics/test
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0000 type:create cxid:0x5 zxid:0x1a txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [127.0.0.1:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0

[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x42 zxid:0x1d txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [127.0.0.1:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id = producer-1
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0

[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x43 zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [127.0.0.1:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer0
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = group0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [127.0.0.1:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer0
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = group0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE}
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:setData cxid:0x4f zxid:0x22 txntype:-1 reqpath:n/a Error Path:/config/topics/__consumer_offsets Error:KeeperErrorCode = NoNode for /config/topics/__consumer_offsets
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x50 zxid:0x23 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x89 zxid:0x26 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/32 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/32
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x8a zxid:0x27 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x8e zxid:0x2b txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/16 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/16
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x91 zxid:0x2e txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/49 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/49
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x94 zxid:0x31 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/44 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/44
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x97 zxid:0x34 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/28 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/28
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x9a zxid:0x37 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/17 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/17
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x9d zxid:0x3a txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/23 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/23
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xa0 zxid:0x3d txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/7 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/7
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xa3 zxid:0x40 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/4 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/4
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xa6 zxid:0x43 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/29 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/29
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xa9 zxid:0x46 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/35 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/35
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xac zxid:0x49 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/3 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/3
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xaf zxid:0x4c txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/24 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/24
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xb2 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/41 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/41
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xb5 zxid:0x52 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/0
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xb8 zxid:0x55 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/38 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/38
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xbb zxid:0x58 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/13 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/13
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xbe zxid:0x5b txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/8 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/8
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xc1 zxid:0x5e txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/5 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/5
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xc4 zxid:0x61 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/39 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/39
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xc7 zxid:0x64 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/36 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/36
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xca zxid:0x67 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/40 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/40
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xcd zxid:0x6a txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/45 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/45
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xd0 zxid:0x6d txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/15 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/15
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xd3 zxid:0x70 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/33 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/33
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xd6 zxid:0x73 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/37 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/37
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xd9 zxid:0x76 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/21 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/21
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xdc zxid:0x79 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/6 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/6
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xdf zxid:0x7c txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/11 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/11
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xe2 zxid:0x7f txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/20 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/20
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xe5 zxid:0x82 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/47 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/47
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xe8 zxid:0x85 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/2 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/2
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xeb zxid:0x88 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/27 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/27
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xee zxid:0x8b txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/34 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/34
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xf1 zxid:0x8e txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/9 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/9
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xf4 zxid:0x91 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/22 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/22
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xf7 zxid:0x94 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/42 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/42
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xfa zxid:0x97 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/14 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/14
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xfd zxid:0x9a txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/25 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/25
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x100 zxid:0x9d txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/10 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/10
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x103 zxid:0xa0 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/48 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/48
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x106 zxid:0xa3 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/31 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/31
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x109 zxid:0xa6 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/18 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/18
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x10c zxid:0xa9 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/19 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/19
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x10f zxid:0xac txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/12 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/12
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x112 zxid:0xaf txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/46 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/46
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x115 zxid:0xb2 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/43 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/43
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x118 zxid:0xb5 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/1 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/1
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x11b zxid:0xb8 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/26 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/26
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x11e zxid:0xbb txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/30 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/30
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator 127.0.0.1:9092 (id: 2147483647 rack: null) for group group0.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group group0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group group0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group group0 with generation 1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [test-0] for group group0
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x155a7c9cb2e0001
[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x155a7c9cb2e0001 closed
[main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:64414 which had sessionid 0x155a7c9cb2e0001
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x155a7c9cb2e0000
[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x155a7c9cb2e0000 closed
[main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:64413 which had sessionid 0x155a7c9cb2e0000
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - shutting down
[main] INFO org.apache.zookeeper.server.SessionTrackerImpl - Shutting down
[main] INFO org.apache.zookeeper.server.PrepRequestProcessor - Shutting down
[main] INFO org.apache.zookeeper.server.SyncRequestProcessor - Shutting down
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop!
[SyncThread:0] INFO org.apache.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
[main] INFO org.apache.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - shutting down
[main] INFO org.apache.zookeeper.server.SessionTrackerImpl - Shutting down
[main] INFO org.apache.zookeeper.server.PrepRequestProcessor - Shutting down
[main] INFO org.apache.zookeeper.server.SyncRequestProcessor - Shutting down
[main] INFO org.apache.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
offset = 0, key = 42, value = [B@4f6f416f
Process finished with exit code 0

@srinidhis94
Copy link

srinidhis94 commented Nov 25, 2016

Hi,
I am getting assertion error during
ConsumerRecords<Integer, byte[]> records = consumer.poll(1000);
assertEquals(1, records.count());

So basically consumer is not getting any record. What might be the reason and how i can fix this ?

@an247
Copy link

an247 commented Dec 8, 2016

Thanks for this.
@srinidhis94 and others who may have faced the same issue with the consumer not reading anything, simply bumping up the poll timeout value from 1000 should work.

@shineedbasheer
Copy link

shineedbasheer commented May 19, 2017

I am also getting same assertion error during

 ConsumerRecords<Integer, byte[]> records = consumer.poll(1000);
assertEquals(1, records.count());

Same as @srinidhis94.

I tried what @an247 suggested by increasing poll timeout. But not solved my problem

@abtpst
Copy link

abtpst commented Oct 24, 2017

hi @asmaier, thank you so much for this. i have used this successfully in the past. does the code work for kafka 0.10.1.0?

recently I have been getting

java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:183)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.log.Defaults$.<init>(LogConfig.scala:35)
    at kafka.log.Defaults$.<clinit>(LogConfig.scala)
    at kafka.log.LogConfig$.<init>(LogConfig.scala:246)
    at kafka.log.LogConfig$.<clinit>(LogConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:270)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:795)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:797)
    at com.ibm.whi.bap.helper.test.kafka.KafkaServerTest.<init>(KafkaServerTest.java:56)
    at com.ibm.whi.bap.helper.test.kafka.KafkaTest.checkAllProperties(KafkaTest.java:115)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
    at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

java.lang.NullPointerException
    at com.ibm.whi.bap.helper.test.kafka.KafkaTest.tearDown(KafkaTest.java:58)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:37)
    at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

at

KafkaConfig config = new KafkaConfig(brokerProps);

I did not change anything else. Any idea what might be wrong? Here is my maven

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>

@asmaier
Copy link
Author

asmaier commented Dec 10, 2017

I updated the example right now to work with Kafka 0.11.0.2. I also increased the poll timeout to 5000ms. As dependencies I use now

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.11.0.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.11.0.2</version>
      <classifier>test</classifier>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.2</version>
      <classifier>test</classifier>
      <scope>test</scope>
    </dependency>
  </dependencies>

@asubb
Copy link

asubb commented Oct 11, 2018

Gradle and Kotlin example based on this implementation.

gradle.properties dependencies section:

    ver = [junit: '4.12', kafka: '1.1.1']
    testImplementation "junit:junit:$ver.junit"
    implementation "org.apache.kafka:kafka-clients:$ver.kafka"
    testImplementation "org.apache.kafka:kafka-clients:$ver.kafka:test"
    testImplementation "org.apache.kafka:kafka_2.11:$ver.kafka"
    testImplementation "org.apache.kafka:kafka_2.11:$ver.kafka:test"

KafkaEmbedded.kt

class KafkaEmbedded(port: Int, topic: String) : Closeable {

    private val server: KafkaServer
    private val zkClient: ZkClient
    private val zkServer: EmbeddedZookeeper

    init {
        zkServer = EmbeddedZookeeper()
        val zkConnect = "127.0.0.1:${zkServer.port()}"

        val props = Properties()
        props.setProperty("zookeeper.connect", zkConnect)
        props.setProperty("broker.id", "0")
        props.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString())
        props.setProperty("listeners", "PLAINTEXT://127.0.0.1:$port")
        props.setProperty("offsets.topic.replication.factor", "1")

        server = KafkaServer(KafkaConfig(props), Time.SYSTEM, Option.apply("kafka-broker"), JavaConversions.asScalaBuffer(emptyList()))
        server.startup()

        zkClient = ZkClient(zkConnect, 30000, 30000, `ZKStringSerializer$`.`MODULE$`)
        val zkUtils = ZkUtils.apply(zkClient, false)
        AdminUtils.createTopic(zkUtils, topic, 1, 1, Properties(), RackAwareMode.`Disabled$`.`MODULE$`)

        TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(listOf(server)), topic, 0, 5000);

    }

    override fun close() {
        server.shutdown()
        server.awaitShutdown()
        zkClient.close()
        zkServer.shutdown()
    }
}

Usage example:

KafkaEmbedded(12345, "test").use {
   // produce-consume-assert
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment