Skip to content

Instantly share code, notes, and snippets.

@lburgazzoli
Created March 11, 2022 10:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lburgazzoli/d782736e1ec1c0d912cd0004361a8cc6 to your computer and use it in GitHub Desktop.
Save lburgazzoli/d782736e1ec1c0d912cd0004361a8cc6 to your computer and use it in GitHub Desktop.
package org.bf2.cos.connector.camel.it;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import io.strimzi.test.container.StrimziKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;
public class KafkaIT {
@Test
public void test() throws Exception {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer()) {
kafka.withKafkaVersion("2.8.1");
kafka.withBrokerId(1);
kafka.waitForRunning();
kafka.withStartupTimeout(Duration.ofSeconds(30));
kafka.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka")));
kafka.start();
KafkaAdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()))
.createTopics(List.of(new NewTopic("foo", 1, (short) 1)))
.all()
.get(30, TimeUnit.SECONDS);
var pr = send(kafka, "foo", "bar", "baz");
assertThat(pr.hasOffset()).isTrue();
assertThat(pr.offset()).isNotEqualTo(-1);
var rec = read(kafka, "foo");
assertThat(rec).isNotEmpty();
}
}
public static RecordMetadata send(StrimziKafkaContainer container, String topic, String key, String value) {
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, container.getBootstrapServers());
config.put(ProducerConfig.CLIENT_ID_CONFIG, container.getContainerId());
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (var kp = new KafkaProducer<String, String>(config)) {
return kp.send(new ProducerRecord<>(topic, key, value)).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static ConsumerRecords<String, String> read(StrimziKafkaContainer container, String topic) {
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, container.getBootstrapServers());
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
try (var kp = new KafkaConsumer<String, String>(config)) {
kp.subscribe(List.of(topic));
return kp.poll(Duration.ofSeconds(30));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment