Skip to content

Instantly share code, notes, and snippets.

@krasowskir
Created February 28, 2021 18:51
Show Gist options
  • Save krasowskir/7193b7e7c4715cac2d6431a41c7cb4bc to your computer and use it in GitHub Desktop.
Save krasowskir/7193b7e7c4715cac2d6431a41c7cb4bc to your computer and use it in GitHub Desktop.
package servicetests
import com.example.kafkaFirstConsumer.DemoApplication
import com.example.kafkaFirstConsumer.service.MyAckConsumer
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.ClassRule
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.util.TestPropertyValues
import org.springframework.context.ApplicationContextInitializer
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.test.utils.KafkaTestUtils
import org.springframework.test.context.ContextConfiguration
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.spock.Testcontainers
import org.testcontainers.utility.DockerImageName
import spock.lang.Shared
import spock.lang.Specification
@Testcontainers
@ContextConfiguration(initializers = [Initializer.class])
@SpringBootTest(classes = [DemoApplication.class])
class ConsumeEventsSpec extends Specification {
String topic = 'test-Topic'
ConcurrentMessageListenerContainer<String, String> container1
ConcurrentMessageListenerContainer<String, String> container2
@Shared
@ClassRule
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:5.3.0'))
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
void initialize(ConfigurableApplicationContext configurableApplicationContext) {
TestPropertyValues.of(
'spring.kafka.bootstrap-servers=' + kafka.getBootstrapServers())
.applyTo(configurableApplicationContext.getEnvironment())
}
}
def setupSpec() {
kafka.start()
}
def 'test order consumer is able to consume messages'() {
given: 'a kafka template'
def configs = new HashMap(KafkaTestUtils.producerProps(kafka.getBootstrapServers()))
def factory = new DefaultKafkaProducerFactory<String, String>(configs, new StringSerializer(), new StringSerializer())
def template = new KafkaTemplate<String, String>(factory, true)
and: 'two producer records'
ProducerRecord<String, String> record = new ProducerRecord<>(topic, '1', 'Test 123')
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, '2', 'Test 123')
and: 'two consumers'
Map<String, Object> props = setBasicProperties()
def testCons1 = createMyConsumer1(props as Properties, consumerFactory(props as Properties))
def testCons2 = createMyConsumer2(props as Properties, consumerFactory(props as Properties))
when: 'sending a message to kafka'
Thread.sleep(5000)
template.send(record).get()
template.send(record).get()
template.send(record2).get()
template.send(record2).get()
template.send(record).get()
then: 'the message is consumed and acknowledged successfully'
KafkaTestUtils.getEndOffsets(testCons1 as Consumer<String, String>, topic, 0).get(new TopicPartition(topic, 0)) == 2
KafkaTestUtils.getEndOffsets(testCons2 as Consumer<String, String>, topic, 1).get(new TopicPartition(topic, 1)) == 3
}
Map<String, String> setBasicProperties() {
HashMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_INSTANCE_ID_DOC, 'rich1',
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest'
)
}
ConsumerFactory consumerFactory(Properties props) {
return new DefaultKafkaConsumerFactory<>(props as Map<String, String>, new StringDeserializer(), new StringDeserializer())
}
Consumer<String, String> createMyConsumer1(Properties props, ConsumerFactory consumerFactory) {
container1 = new ConcurrentMessageListenerContainer(consumerFactory, containerProperties('gruppe-1', topic))
container1.setConcurrency(1)
container1.start()
return consumerFactory.createConsumer('richSuffix')
}
Consumer<String, String> createMyConsumer2(Properties props, ConsumerFactory consumerFactory) {
container2 = new ConcurrentMessageListenerContainer(consumerFactory, containerProperties('gruppe-2', topic))
container2.setConcurrency(1)
container2.start()
return consumerFactory.createConsumer('richSuffix2')
}
ContainerProperties containerProperties(String group, String topic) {
def consumer1 = new MyAckConsumer()
ContainerProperties containerProps = new ContainerProperties(topic)
containerProps.messageListener = consumer1
containerProps.ackMode = ContainerProperties.AckMode.MANUAL
containerProps.groupId = group
return containerProps
}
}
package com.example.kafkaFirstConsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@EnableKafka
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
package com.example.kafkaFirstConsumer.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
public class MyAckConsumer implements AcknowledgingMessageListener<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyAckConsumer.class);
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
System.out.println(consumerRecord.toString());
LOGGER.info(consumerRecord.toString());
acknowledgment.acknowledge();
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
LOGGER.info(data.toString());
System.out.println(data.toString());
}
}
package com.example.kafkaFirstConsumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyConfig {
@Bean
public NewTopic topicExample() {
return TopicBuilder.name("test-Topic")
.partitions(2)
.replicas(1)
.build();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment