Skip to content

Instantly share code, notes, and snippets.

@nddipiazza
Created July 11, 2022 14:55
Show Gist options
  • Save nddipiazza/512ec37d447039a41e1069e1a80e8a47 to your computer and use it in GitHub Desktop.
Save nddipiazza/512ec37d447039a41e1069e1a80e8a47 to your computer and use it in GitHub Desktop.
Kafka with Testcontainers
package org.apache.tika.pipes.kafka.tests;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
public class TikaPipesKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafkaTest.class);
public static final int ZK_PORT = 2181;
public static final int KAFKA_PORT = 9092;
public static final String KAFKA = "kafka1";
public static final String ZOOKEEPER = "zoo1";
@ClassRule
public static DockerComposeContainer environment =
new DockerComposeContainer(new File("src/test/resources/kafka-docker/zk-single-kafka-single.yml"))
.withExposedService(KAFKA, KAFKA_PORT)
.withExposedService(ZOOKEEPER, ZK_PORT)
.withLogConsumer(ZOOKEEPER, new Slf4jLogConsumer(LOG))
.withLogConsumer(KAFKA, new Slf4jLogConsumer(LOG));
@Test
public void testTikaPipesKafka() throws Exception {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getServiceHost(KAFKA, KAFKA_PORT) + ":" + environment.getServicePort(KAFKA, KAFKA_PORT));
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getServiceHost(KAFKA, KAFKA_PORT) + ":" + environment.getServicePort(KAFKA, KAFKA_PORT));
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "grp");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
String topic = "nick";
ExecutorService es = Executors.newCachedThreadPool();
Map<Integer, Boolean> waiting = new ConcurrentHashMap<>();
AtomicBoolean doneEmit = new AtomicBoolean(false);
Future f = es.submit(() -> {
int numRec = 0;
KafkaConsumer consumer = new KafkaConsumer(consumerProps);
consumer.subscribe(Collections.singletonList(topic));
while (!doneEmit.get() || !waiting.isEmpty()) {
try {
ConsumerRecords<Integer, String> records = consumer.poll(1000);
for (ConsumerRecord<Integer, String> record : records) {
waiting.remove(record.key());
LOG.info("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
++numRec;
}
Thread.sleep(500L);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
LOG.info("Consumer is now complete. NumRec={}", numRec);
});
Thread.sleep(8000L);
es.submit(() -> {
KafkaProducer producer = new KafkaProducer(producerProps);
int numSent = 0;
for (int messageNo = 1; messageNo < 100; ++messageNo) {
String messageStr = "Message_" + messageNo;
try {
waiting.put(messageNo, true);
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
LOG.info("Sent message: (" + messageNo + ", " + messageStr + ")");
++numSent;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
doneEmit.set(true);
LOG.info("Producer is now complete - sent {}.", numSent);
});
f.get();
LOG.info("Done");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment