Last active
January 10, 2020 08:36
-
-
Save kamalcph/46ba855f49a16c8a9c2e427bdff87ca3 to your computer and use it in GitHub Desktop.
EAR-11564 Reproducer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2016-2020 Cloudera, Inc. All rights reserved. | |
* | |
* This code is provided to you pursuant to your written agreement with Cloudera, which may be the terms of the | |
* Affero General Public License version 3 (AGPLv3), or pursuant to a written agreement with a third party authorized | |
* to distribute this code. If you do not have a written agreement with Cloudera or with an authorized and | |
* properly licensed third party, you do not have any rights to this code. | |
* | |
* If this code is provided to you under the terms of the AGPLv3: | |
* (A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY KIND; | |
* (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT | |
* LIMITED TO IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE; | |
* (C) CLOUDERA IS NOT LIABLE TO YOU, AND WILL NOT DEFEND, INDEMNIFY, OR HOLD YOU HARMLESS FOR ANY CLAIMS ARISING | |
* FROM OR RELATED TO THE CODE; AND | |
* (D) WITH RESPECT TO YOUR EXERCISE OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY | |
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED | |
* TO, DAMAGES RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF BUSINESS ADVANTAGE OR | |
* UNAVAILABILITY, OR LOSS OR CORRUPTION OF DATA. | |
*/ | |
package org.apache.kafka.clients.consumer; | |
import org.apache.kafka.clients.admin.AdminClient; | |
import org.apache.kafka.clients.admin.AdminClientConfig; | |
import org.apache.kafka.clients.admin.NewTopic; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.PartitionInfo; | |
import org.apache.kafka.common.TopicPartition; | |
import org.apache.kafka.common.serialization.ByteArrayDeserializer; | |
import org.apache.kafka.common.serialization.ByteArraySerializer; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.regex.Pattern; | |
public class MyTest2 { | |
private static final String BOOTSTRAP_SERVERS = "localhost:9092"; | |
private static final String groupId = "salem"; | |
private static final String topic = "mango"; | |
public static void main(String[] args) { | |
// admin client | |
final Map<String, Object> adminClientProps = getAdminClientProps("admin-" + Thread.currentThread().getId()); | |
try (final AdminClient client = AdminClient.create(adminClientProps)) { | |
client.createTopics(Collections.singleton(new NewTopic(topic, 10, (short) 1))); | |
} | |
final ExecutorService executor = Executors.newFixedThreadPool(2); | |
// producer | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
final String clientId = "producer-" + Thread.currentThread().getId(); | |
final byte[] valueBytes = clientId.getBytes(); | |
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(getProducerProps(clientId))) { | |
while (true) { | |
for (int i=0; i<100; i++) { | |
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, valueBytes); | |
producer.send(record); | |
} | |
try { | |
Thread.sleep(1); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
}); | |
// consumer restarts frequently to simulate the error event. | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
final Map<TopicPartition, Long> beginPositions = new HashMap<>(); | |
while (true) { | |
getConsumer(false, beginPositions).run(); | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
}); | |
} | |
private static Runnable getConsumer(final boolean runContinuously, final Map<TopicPartition, Long> beginPositions) { | |
return new Runnable() { | |
@Override | |
public void run() { | |
final String clientId = "client-" + Thread.currentThread().getId(); | |
int counter = runContinuously ? Integer.MAX_VALUE : 50; | |
System.out.println("beginPositions: " + beginPositions); | |
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerProps(clientId))) { | |
consumer.subscribe(Collections.singleton(topic)); // subscribe mode | |
// consumer.subscribe(Pattern.compile(topic)); // subscribePattern mode | |
// assignTopicPartitions(consumer); // assign mode | |
consumer.poll(0); | |
do { | |
for (TopicPartition partition : consumer.assignment()) { | |
final long currentPosition = consumer.position(partition); | |
final Long beginPosition = beginPositions.get(partition); | |
if (beginPosition != null && beginPosition > currentPosition) { | |
System.err.println(partition + ": beginPosition: " + beginPosition + " is higher than the currentPosition: " + currentPosition); | |
System.exit(1); | |
} else { | |
System.out.println("Processing " + partition + " nextBatch : " + beginPosition + " -> " + currentPosition); | |
beginPositions.put(partition, currentPosition); | |
} | |
} | |
System.out.println("Updated beginPositions: " + beginPositions + ", counter = " + counter); | |
consumer.commitAsync(); | |
consumer.seekToEnd(consumer.assignment()); | |
} while (counter-- > 0); | |
} | |
} | |
private void assignTopicPartitions(final KafkaConsumer<byte[], byte[]> consumer) { | |
final List<TopicPartition> partitions = new ArrayList<>(); | |
for (final PartitionInfo info : consumer.partitionsFor(topic)) { | |
partitions.add(new TopicPartition(info.topic(), info.partition())); | |
} | |
consumer.assign(partitions); | |
} | |
}; | |
} | |
private static Map<String, Object> getProducerProps(final String clientId) { | |
final Map<String, Object> producerProps = new HashMap<>(); | |
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); | |
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); | |
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); | |
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); | |
return producerProps; | |
} | |
private static Map<String, Object> getConsumerProps(final String clientId) { | |
final Map<String, Object> consumerProps = new HashMap<>(); | |
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); | |
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); | |
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); | |
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); | |
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); | |
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); | |
return consumerProps; | |
} | |
private static Map<String, Object> getAdminClientProps(final String clientId) { | |
final Map<String, Object> adminClientProps = new HashMap<>(); | |
adminClientProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); | |
adminClientProps.put(AdminClientConfig.CLIENT_ID_CONFIG, clientId); | |
return adminClientProps; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment