Skip to content

Instantly share code, notes, and snippets.

@asmaier
Last active March 23, 2022 11:16
Show Gist options
  • Star 36 You must be signed in to star a gist
  • Fork 12 You must be signed in to fork a gist
  • Save asmaier/6465468 to your computer and use it in GitHub Desktop.
Save asmaier/6465468 to your computer and use it in GitHub Desktop.
Simple java junit test of an apache kafka producer (works with Kafka 0.11.0.2) (see also https://github.com/asmaier/mini-kafka)
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.kafka.common.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import static org.junit.Assert.*;
/**
* For online documentation
* see
* https://github.com/apache/kafka/blob/0.10.0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/admin/AdminUtils.scala
* https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/utils/ZkUtils.scala
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
* http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
* http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
*/
public class KafkaProducerIT {
private static final String ZKHOST = "127.0.0.1";
private static final String BROKERHOST = "127.0.0.1";
private static final String BROKERPORT = "9092";
private static final String TOPIC = "test";
@Test
public void producerTest() throws InterruptedException, IOException {
// setup Zookeeper
EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
String zkConnect = ZKHOST + ":" + zkServer.port();
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
// setup Broker
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT);
brokerProps.setProperty("offsets.topic.replication.factor" , "1");
KafkaConfig config = new KafkaConfig(brokerProps);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
// create topic
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
// setup producer
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);
// setup consumer
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty("group.id", "group0");
consumerProps.setProperty("client.id", "consumer0");
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(TOPIC));
// send message
ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8));
producer.send(data);
producer.close();
// starting consumer
ConsumerRecords<Integer, byte[]> records = consumer.poll(5000);
assertEquals(1, records.count());
Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
ConsumerRecord<Integer, byte[]> record = recordIterator.next();
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
assertEquals(42, (int) record.key());
assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
}
@curtisallen
Copy link

Thanks for this!
you can now create a topic by

// create topic
AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());

@asmaier
Copy link
Author

asmaier commented Nov 20, 2015

I updated the example right now to work with Kafka 0.8.2.2. As dependencies I use

<dependencies>
<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.12</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.2</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.2</version>
  <classifier>test</classifier>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.8.2.2</version>
</dependency>
</dependencies>

To see more output you can also add a log4j.properties file to your test resources with the following content

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F#%M:%L) - %m%n

log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
log4j.logger.org.apache.zookeeper=WARN
log4j.logger.kafka=INFO

@asmaier
Copy link
Author

asmaier commented Nov 23, 2015

I added some code to also consume the message sent.

@jonrodriguezb
Copy link

Hi,

I'm trying to run this example but I can't. I'm getting the following error: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

It seems that something is happening with Scala.. but I don't know how to fix it. The error comes up in the line of the MockTime.

This is my pom.xml file (dependencies):

org.apache.kafka kafka_2.10 0.8.2.2 junit junit 4.4

Help me please!

@adeel-shahzad
Copy link

perfect

@stanley-shi
Copy link

Thanks, this is really helpful!

@asmaier
Copy link
Author

asmaier commented Jul 1, 2016

I updated the example right now to work with Kafka 0.10.0.0 using the new producer and consumer API. As dependencies I use now

<dependencies>
 <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
 </dependency>
 <dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.10.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.10.0.0</version>
  <classifier>test</classifier>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.0</version>
  <classifier>test</classifier>
  <scope>test</scope>
</dependency>
</dependencies>

@topicscout
Copy link

topicscout commented Jul 1, 2016

Hi,

I just the 0.10.0.0.0 version recently posted. It compiled and ran under JK8 on a Mac. Ran tests within Intellij.
Test passed. But there were some logged exceptions, and I would just like to know if those exceptions are expected
as part of the test or not.

And thanks again for this! Much needed!

regards,

Stefan

[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:host.name=192.168.2.227
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.version=1.8.0_91
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/81/zg01zvv52p79rfzbfxcdkhym0000gn/T/
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Mac OS X
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=x86_64
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=10.11.1
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=xxxxxxx
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=xxxxxxxxxxxxxx
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/Users/xxxxxxx/dev
[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:64411 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@2641e737
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64411. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:64411, initiating session
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:64413
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:64413
[SyncThread:0] INFO org.apache.zookeeper.server.persistence.FileTxnLog - Creating new log file: log.1
[SyncThread:0] INFO org.apache.zookeeper.server.ZooKeeperServer - Established session 0x155a7c9cb2e0000 with negotiated timeout 10000 for client /127.0.0.1:64413
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:64411, sessionid = 0x155a7c9cb2e0000, negotiated timeout = 10000
[main] INFO kafka.server.KafkaConfig - KafkaConfig values:
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
log.flush.interval.messages = 9223372036854775807
auto.create.topics.enable = true
controller.socket.timeout.ms = 30000
log.flush.interval.ms = null
principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
replica.socket.receive.buffer.bytes = 65536
min.insync.replicas = 1
replica.fetch.wait.max.ms = 500
num.recovery.threads.per.data.dir = 1
ssl.keystore.type = JKS
sasl.mechanism.inter.broker.protocol = GSSAPI
default.replication.factor = 1
ssl.truststore.password = null
log.preallocate = false
sasl.kerberos.principal.to.local.rules = [DEFAULT]
fetch.purgatory.purge.interval.requests = 1000
ssl.endpoint.identification.algorithm = null
replica.socket.timeout.ms = 30000
message.max.bytes = 1000012
num.io.threads = 8
offsets.commit.required.acks = -1
log.flush.offset.checkpoint.interval.ms = 60000
delete.topic.enable = false
quota.window.size.seconds = 1
ssl.truststore.type = JKS
offsets.commit.timeout.ms = 5000
quota.window.num = 11
zookeeper.connect = 127.0.0.1:64411
authorizer.class.name =
num.replica.fetchers = 1
log.retention.ms = null
log.roll.jitter.hours = 0
log.cleaner.enable = true
offsets.load.buffer.size = 5242880
log.cleaner.delete.retention.ms = 86400000
ssl.client.auth = none
controlled.shutdown.max.retries = 3
queued.max.requests = 500
offsets.topic.replication.factor = 3
log.cleaner.threads = 1
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
socket.request.max.bytes = 104857600
ssl.trustmanager.algorithm = PKIX
zookeeper.session.timeout.ms = 6000
log.retention.bytes = -1
log.message.timestamp.type = CreateTime
sasl.kerberos.min.time.before.relogin = 60000
zookeeper.set.acl = false
connections.max.idle.ms = 600000
offsets.retention.minutes = 1440
replica.fetch.backoff.ms = 1000
inter.broker.protocol.version = 0.10.0-IV1
log.retention.hours = 168
num.partitions = 1
broker.id.generation.enable = true
listeners = PLAINTEXT://127.0.0.1:9092
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
log.roll.ms = null
log.flush.scheduler.interval.ms = 9223372036854775807
ssl.cipher.suites = null
log.index.size.max.bytes = 10485760
ssl.keymanager.algorithm = SunX509
security.inter.broker.protocol = PLAINTEXT
replica.fetch.max.bytes = 1048576
advertised.port = null
log.cleaner.dedupe.buffer.size = 134217728
replica.high.watermark.checkpoint.interval.ms = 5000
log.cleaner.io.buffer.size = 524288
sasl.kerberos.ticket.renew.window.factor = 0.8
zookeeper.connection.timeout.ms = null
controlled.shutdown.retry.backoff.ms = 5000
log.roll.hours = 168
log.cleanup.policy = delete
host.name =
log.roll.jitter.ms = null
max.connections.per.ip = 2147483647
offsets.topic.segment.bytes = 104857600
background.threads = 10
quota.consumer.default = 9223372036854775807
request.timeout.ms = 30000
log.message.format.version = 0.10.0-IV1
log.index.interval.bytes = 4096
log.dir = /tmp/kafka-logs
log.segment.bytes = 1073741824
log.cleaner.backoff.ms = 15000
offset.metadata.max.bytes = 4096
ssl.truststore.location = null
group.max.session.timeout.ms = 300000
ssl.keystore.password = null
zookeeper.sync.time.ms = 2000
port = 9092
log.retention.minutes = null
log.segment.delete.delay.ms = 60000
log.dirs = /var/folders/81/zg01zvv52p79rfzbfxcdkhym0000gn/T/kafka-7807231158003385273
controlled.shutdown.enable = true
compression.type = producer
max.connections.per.ip.overrides =
log.message.timestamp.difference.max.ms = 9223372036854775807
sasl.kerberos.kinit.cmd = /usr/bin/kinit
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.cleaner.min.cleanable.ratio = 0.5
replica.lag.time.max.ms = 10000
num.network.threads = 3
ssl.key.password = null
reserved.broker.max.id = 1000
metrics.num.samples = 2
socket.send.buffer.bytes = 102400
ssl.protocol = TLS
socket.receive.buffer.bytes = 102400
ssl.keystore.location = null
replica.fetch.min.bytes = 1
broker.rack = null
unclean.leader.election.enable = true
sasl.enabled.mechanisms = [GSSAPI]
group.min.session.timeout.ms = 6000
log.cleaner.io.buffer.load.factor = 0.9
offsets.retention.check.interval.ms = 600000
producer.purgatory.purge.interval.requests = 1000
metrics.sample.window.ms = 30000
broker.id = 0
offsets.topic.compression.codec = 0
log.retention.check.interval.ms = 300000
advertised.listeners = null
leader.imbalance.per.broker.percentage = 10

[main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:64411 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@15ca7889
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64411. Will not attempt to authenticate using SASL (unknown error)
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:64411, initiating session
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:64414
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:64414
[SyncThread:0] INFO org.apache.zookeeper.server.ZooKeeperServer - Established session 0x155a7c9cb2e0001 with negotiated timeout 6000 for client /127.0.0.1:64414
[main-SendThread(127.0.0.1:64411)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:64411, sessionid = 0x155a7c9cb2e0001, negotiated timeout = 6000
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /brokers
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xb zxid:0x8 txntype:-1 reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x13 zxid:0xd txntype:-1 reqpath:n/a Error Path:/admin Error:KeeperErrorCode = NoNode for /admin
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:setData cxid:0x21 zxid:0x13 txntype:-1 reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:delete cxid:0x30 zxid:0x15 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x37 zxid:0x16 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x38 zxid:0x17 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0000 type:setData cxid:0x4 zxid:0x19 txntype:-1 reqpath:n/a Error Path:/config/topics/test Error:KeeperErrorCode = NoNode for /config/topics/test
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0000 type:create cxid:0x5 zxid:0x1a txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [127.0.0.1:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0

[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x42 zxid:0x1d txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions/0
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [127.0.0.1:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id = producer-1
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0

[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x43 zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [127.0.0.1:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer0
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = group0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [127.0.0.1:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer0
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = group0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE}
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:setData cxid:0x4f zxid:0x22 txntype:-1 reqpath:n/a Error Path:/config/topics/__consumer_offsets Error:KeeperErrorCode = NoNode for /config/topics/__consumer_offsets
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x50 zxid:0x23 txntype:-1 reqpath:n/a Error Path:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x89 zxid:0x26 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/32 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/32
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x8a zxid:0x27 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x8e zxid:0x2b txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/16 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/16
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x91 zxid:0x2e txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/49 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/49
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x94 zxid:0x31 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/44 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/44
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x97 zxid:0x34 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/28 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/28
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x9a zxid:0x37 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/17 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/17
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x9d zxid:0x3a txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/23 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/23
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xa0 zxid:0x3d txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/7 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/7
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xa3 zxid:0x40 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/4 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/4
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xa6 zxid:0x43 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/29 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/29
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xa9 zxid:0x46 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/35 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/35
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xac zxid:0x49 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/3 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/3
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xaf zxid:0x4c txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/24 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/24
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xb2 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/41 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/41
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xb5 zxid:0x52 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/0 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/0
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xb8 zxid:0x55 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/38 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/38
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xbb zxid:0x58 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/13 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/13
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xbe zxid:0x5b txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/8 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/8
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xc1 zxid:0x5e txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/5 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/5
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xc4 zxid:0x61 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/39 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/39
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xc7 zxid:0x64 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/36 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/36
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xca zxid:0x67 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/40 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/40
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xcd zxid:0x6a txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/45 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/45
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xd0 zxid:0x6d txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/15 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/15
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xd3 zxid:0x70 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/33 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/33
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xd6 zxid:0x73 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/37 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/37
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xd9 zxid:0x76 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/21 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/21
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xdc zxid:0x79 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/6 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/6
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xdf zxid:0x7c txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/11 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/11
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xe2 zxid:0x7f txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/20 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/20
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xe5 zxid:0x82 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/47 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/47
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xe8 zxid:0x85 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/2 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/2
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xeb zxid:0x88 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/27 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/27
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xee zxid:0x8b txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/34 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/34
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xf1 zxid:0x8e txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/9 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/9
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xf4 zxid:0x91 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/22 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/22
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xf7 zxid:0x94 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/42 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/42
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xfa zxid:0x97 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/14 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/14
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0xfd zxid:0x9a txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/25 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/25
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x100 zxid:0x9d txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/10 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/10
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x103 zxid:0xa0 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/48 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/48
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x106 zxid:0xa3 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/31 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/31
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x109 zxid:0xa6 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/18 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/18
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x10c zxid:0xa9 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/19 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/19
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x10f zxid:0xac txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/12 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/12
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x112 zxid:0xaf txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/46 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/46
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x115 zxid:0xb2 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/43 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/43
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x118 zxid:0xb5 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/1 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/1
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x11b zxid:0xb8 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/26 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/26
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x155a7c9cb2e0001 type:create cxid:0x11e zxid:0xbb txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/30 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/30
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator 127.0.0.1:9092 (id: 2147483647 rack: null) for group group0.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group group0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group group0
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group group0 with generation 1
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [test-0] for group group0
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x155a7c9cb2e0001
[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x155a7c9cb2e0001 closed
[main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:64414 which had sessionid 0x155a7c9cb2e0001
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - Processed session termination for sessionid: 0x155a7c9cb2e0000
[main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x155a7c9cb2e0000 closed
[main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client /127.0.0.1:64413 which had sessionid 0x155a7c9cb2e0000
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - shutting down
[main] INFO org.apache.zookeeper.server.SessionTrackerImpl - Shutting down
[main] INFO org.apache.zookeeper.server.PrepRequestProcessor - Shutting down
[main] INFO org.apache.zookeeper.server.SyncRequestProcessor - Shutting down
[ProcessThread(sid:0 cport:-1):] INFO org.apache.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited loop!
[SyncThread:0] INFO org.apache.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
[main] INFO org.apache.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
[NIOServerCxn.Factory:/127.0.0.1:0] INFO org.apache.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited run method
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - shutting down
[main] INFO org.apache.zookeeper.server.SessionTrackerImpl - Shutting down
[main] INFO org.apache.zookeeper.server.PrepRequestProcessor - Shutting down
[main] INFO org.apache.zookeeper.server.SyncRequestProcessor - Shutting down
[main] INFO org.apache.zookeeper.server.FinalRequestProcessor - shutdown of request processor complete
offset = 0, key = 42, value = [B@4f6f416f
Process finished with exit code 0

@srinidhis94
Copy link

Hi,
I am getting assertion error during
ConsumerRecords<Integer, byte[]> records = consumer.poll(1000);
assertEquals(1, records.count());

So basically consumer is not getting any record. What might be the reason and how i can fix this ?

@an247
Copy link

an247 commented Dec 8, 2016

Thanks for this.
@srinidhis94 and others who may have faced the same issue with the consumer not reading anything, simply bumping up the poll timeout value from 1000 should work.

@shineedbasheer
Copy link

I am also getting same assertion error during

 ConsumerRecords<Integer, byte[]> records = consumer.poll(1000);
assertEquals(1, records.count());

Same as @srinidhis94.

I tried what @an247 suggested by increasing poll timeout. But not solved my problem

@abtpst
Copy link

abtpst commented Oct 24, 2017

hi @asmaier, thank you so much for this. i have used this successfully in the past. does the code work for kafka 0.10.1.0?

recently I have been getting

java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:183)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.log.Defaults$.<init>(LogConfig.scala:35)
    at kafka.log.Defaults$.<clinit>(LogConfig.scala)
    at kafka.log.LogConfig$.<init>(LogConfig.scala:246)
    at kafka.log.LogConfig$.<clinit>(LogConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:270)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:795)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:797)
    at com.ibm.whi.bap.helper.test.kafka.KafkaServerTest.<init>(KafkaServerTest.java:56)
    at com.ibm.whi.bap.helper.test.kafka.KafkaTest.checkAllProperties(KafkaTest.java:115)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
    at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

java.lang.NullPointerException
    at com.ibm.whi.bap.helper.test.kafka.KafkaTest.tearDown(KafkaTest.java:58)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:37)
    at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48)
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

at

KafkaConfig config = new KafkaConfig(brokerProps);

I did not change anything else. Any idea what might be wrong? Here is my maven

       <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>

@asmaier
Copy link
Author

asmaier commented Dec 10, 2017

I updated the example right now to work with Kafka 0.11.0.2. I also increased the poll timeout to 5000ms. As dependencies I use now

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.11.0.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.11.0.2</version>
      <classifier>test</classifier>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.2</version>
      <classifier>test</classifier>
      <scope>test</scope>
    </dependency>
  </dependencies>

@asubb
Copy link

asubb commented Oct 11, 2018

Gradle and Kotlin example based on this implementation.

gradle.properties dependencies section:

    ver = [junit: '4.12', kafka: '1.1.1']
    testImplementation "junit:junit:$ver.junit"
    implementation "org.apache.kafka:kafka-clients:$ver.kafka"
    testImplementation "org.apache.kafka:kafka-clients:$ver.kafka:test"
    testImplementation "org.apache.kafka:kafka_2.11:$ver.kafka"
    testImplementation "org.apache.kafka:kafka_2.11:$ver.kafka:test"

KafkaEmbedded.kt

class KafkaEmbedded(port: Int, topic: String) : Closeable {

    private val server: KafkaServer
    private val zkClient: ZkClient
    private val zkServer: EmbeddedZookeeper

    init {
        zkServer = EmbeddedZookeeper()
        val zkConnect = "127.0.0.1:${zkServer.port()}"

        val props = Properties()
        props.setProperty("zookeeper.connect", zkConnect)
        props.setProperty("broker.id", "0")
        props.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString())
        props.setProperty("listeners", "PLAINTEXT://127.0.0.1:$port")
        props.setProperty("offsets.topic.replication.factor", "1")

        server = KafkaServer(KafkaConfig(props), Time.SYSTEM, Option.apply("kafka-broker"), JavaConversions.asScalaBuffer(emptyList()))
        server.startup()

        zkClient = ZkClient(zkConnect, 30000, 30000, `ZKStringSerializer$`.`MODULE$`)
        val zkUtils = ZkUtils.apply(zkClient, false)
        AdminUtils.createTopic(zkUtils, topic, 1, 1, Properties(), RackAwareMode.`Disabled$`.`MODULE$`)

        TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(listOf(server)), topic, 0, 5000);

    }

    override fun close() {
        server.shutdown()
        server.awaitShutdown()
        zkClient.close()
        zkServer.shutdown()
    }
}

Usage example:

KafkaEmbedded(12345, "test").use {
   // produce-consume-assert
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment