Created
January 8, 2020 09:08
-
-
Save kamalcph/ed22dc2f9e04f4154d2b46d2257d6096 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2016-2019 Cloudera, Inc. All rights reserved. | |
* | |
* This code is provided to you pursuant to your written agreement with Cloudera, which may be the terms of the | |
* Affero General Public License version 3 (AGPLv3), or pursuant to a written agreement with a third party authorized | |
* to distribute this code. If you do not have a written agreement with Cloudera or with an authorized and | |
* properly licensed third party, you do not have any rights to this code. | |
* | |
* If this code is provided to you under the terms of the AGPLv3: | |
* (A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY KIND; | |
* (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT | |
* LIMITED TO IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE; | |
* (C) CLOUDERA IS NOT LIABLE TO YOU, AND WILL NOT DEFEND, INDEMNIFY, OR HOLD YOU HARMLESS FOR ANY CLAIMS ARISING | |
* FROM OR RELATED TO THE CODE; AND | |
* (D) WITH RESPECT TO YOUR EXERCISE OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY | |
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED | |
* TO, DAMAGES RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF BUSINESS ADVANTAGE OR | |
* UNAVAILABILITY, OR LOSS OR CORRUPTION OF DATA. | |
*/ | |
package org.apache.kafka.clients.consumer; | |
import org.apache.kafka.clients.admin.AdminClient; | |
import org.apache.kafka.clients.admin.AdminClientConfig; | |
import org.apache.kafka.clients.admin.NewTopic; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.TopicPartition; | |
import org.apache.kafka.common.serialization.ByteArrayDeserializer; | |
import org.apache.kafka.common.serialization.ByteArraySerializer; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
/** | |
* Set the group.initial.rebalance.delay.ms=0 in the server properties. | |
*/ | |
public class MyTest { | |
private static final String BOOTSTRAP_SERVERS = "localhost:9092"; | |
private static final String groupId = "sample"; | |
private static final String topic = "test"; | |
public static void main(String[] args) { | |
// admin client | |
try (final AdminClient client = AdminClient.create(getAdminClientProps("admin"))) { | |
client.createTopics(Collections.singleton(new NewTopic(topic, 10, (short) 1))); | |
} | |
final ExecutorService executor = Executors.newFixedThreadPool(3); | |
// producer | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
final String clientId = "producer-" + Thread.currentThread().getId(); | |
final byte[] valueBytes = clientId.getBytes(); | |
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(getProducerProps(clientId))) { | |
while (true) { | |
for (int i=0; i<100; i++) { | |
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, valueBytes); | |
producer.send(record); | |
} | |
try { | |
Thread.sleep(10); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
}); | |
// consumer-1 | |
executor.execute(getConsumer(true)); | |
// consumer-2 restarts frequently to simulate the re-balance. | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
while (true) { | |
getConsumer(false).run(); | |
try { | |
Thread.sleep(5000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
}); | |
} | |
private static Runnable getConsumer(final boolean runContinuously) { | |
return new Runnable() { | |
@Override | |
public void run() { | |
final String clientId = "client-" + Thread.currentThread().getId(); | |
final Map<TopicPartition, Long> beginPositions = new HashMap<>(); | |
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerProps(clientId))) { | |
consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() { | |
@Override | |
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { | |
for (TopicPartition partition : partitions) { | |
beginPositions.remove(partition); | |
} | |
} | |
@Override | |
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | |
for (TopicPartition partition : partitions) { | |
beginPositions.put(partition, consumer.position(partition)); | |
} | |
} | |
}); | |
consumer.poll(0); | |
System.out.println("Begin Positions: " + beginPositions); | |
do { | |
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); | |
for (ConsumerRecord<byte[], byte[]> record: records) { | |
beginPositions.put(new TopicPartition(record.topic(), record.partition()), record.offset()); | |
} | |
System.out.println("Updated the beginPositions: " + beginPositions); | |
consumer.commitAsync(); | |
consumer.seekToEnd(consumer.assignment()); | |
for (TopicPartition partition : consumer.assignment()) { | |
final Long beginOffset = beginPositions.get(partition); | |
final long endOffset = consumer.position(partition); | |
if (beginOffset > endOffset) { | |
System.err.println("Begin Offset: " + beginOffset + " is higher than the end offset: " + endOffset); | |
System.exit(1); | |
} else { | |
System.out.println(clientId + ": Processing " + partition + " next batch : " + beginOffset + " -> " + endOffset); | |
} | |
} | |
} while (runContinuously); | |
} | |
} | |
}; | |
} | |
private static Map<String, Object> getProducerProps(final String clientId) { | |
final Map<String, Object> producerProps = new HashMap<>(); | |
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); | |
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); | |
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); | |
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); | |
return producerProps; | |
} | |
private static Map<String, Object> getConsumerProps(final String clientId) { | |
final Map<String, Object> consumerProps = new HashMap<>(); | |
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); | |
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); | |
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); | |
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); | |
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); | |
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); | |
return consumerProps; | |
} | |
private static Map<String, Object> getAdminClientProps(final String clientId) { | |
final Map<String, Object> adminClientProps = new HashMap<>(); | |
adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); | |
adminClientProps.put(AdminClientConfig.CLIENT_ID_CONFIG, clientId); | |
return adminClientProps; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment