Skip to content

Instantly share code, notes, and snippets.

@bdelbosc
Created April 12, 2021 11:49
Show Gist options
  • Save bdelbosc/006ca2905f6922994c025f783e42afec to your computer and use it in GitHub Desktop.
Save bdelbosc/006ca2905f6922994c025f783e42afec to your computer and use it in GitHub Desktop.
/*
* (C) Copyright 2021 Nuxeo (http://nuxeo.com/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* bdelbosc
*/
package org.nuxeo.lib.stream.tests;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.junit.Test;
/**
*
*/
public class TestRedPanda {
protected static final short DEFAULT_REPLICATION = 1;
protected static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers";
protected static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
public String getPrefixedTopic(String topic) {
return "test-" + System.currentTimeMillis() + "-" + topic;
}
@Test
public void testEndOffset() {
String topic = getPrefixedTopic("testSendMessage");
int partitions = 1;
int records = 1;
createTopic(topic, partitions);
// produces some records
try (Producer<String, String> producer = new KafkaProducer<>(getProducerProperties())) {
assertNotNull(producer);
assertEquals(partitions, producer.partitionsFor(topic).size());
TestCallback callback = new TestCallback();
for (long i = 0; i < records; i++) {
ProducerRecord<String, String> data = new ProducerRecord<>(topic, "key-" + i, "message-" + i);
producer.send(data, callback);
}
}
// Kafka returns 1, RedPanda 2 because the first offset is 1 instead of 0
assertEquals(1, getEndOffset("a-group", topic, 0));
deleteTopic(topic);
}
protected long getEndOffset(String group, String topic, int partition) {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties(group))) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(topicPartition));
return endOffsets.get(topicPartition);
}
}
public static String getBootstrapServers() {
String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS);
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
}
return bootstrapServers;
}
protected Properties getAdminProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
return props;
}
protected Properties getProducerProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// default
props.put(ProducerConfig.RETRIES_CONFIG, 0); // default
props.put(ProducerConfig.ACKS_CONFIG, "1"); // default
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // default
return props;
}
protected Properties getConsumerProperties(String group) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
return props;
}
protected boolean topicExists(String topic) {
try (AdminClient adminClient = AdminClient.create(getAdminProperties())) {
adminClient.describeTopics(Collections.singletonList(topic)).values().get(topic).get();
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
return false;
}
throw new RuntimeException(e);
}
}
protected void createTopic(String topic, int partitions) {
if (topicExists(topic)) {
System.out.println("Existing topic " + topic);
return;
}
try (AdminClient adminClient = AdminClient.create(getAdminProperties())) {
CreateTopicsResult ret = adminClient.createTopics(
Collections.singletonList(new NewTopic(topic, partitions, DEFAULT_REPLICATION)));
ret.all().get(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException("Unable to create topics " + topic + " within the timeout", e);
}
System.out.println("CREATE topic " + topic);
}
protected void deleteTopic(String topic) {
try (AdminClient adminClient = AdminClient.create(getAdminProperties())) {
adminClient.deleteTopics(Collections.singleton(topic));
System.out.println("DELETE topic " + topic);
}
}
protected static class TestCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error while producing message to topic :" + recordMetadata);
e.printStackTrace();
} else {
System.out.printf("Message appended to topic: %s partition:%s offset:%s%n",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
}
}
}
@emaxerrno
Copy link

@bdelbosc - btw; the starting at 1 is no longer the case. we now match exactly the upstream kafka offset management.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment