Skip to content

Instantly share code, notes, and snippets.

@praveen2710
Last active November 9, 2020 16:47
Show Gist options
  • Save praveen2710/7dcf1671379ee6db4581436e1225c673 to your computer and use it in GitHub Desktop.
Save praveen2710/7dcf1671379ee6db4581436e1225c673 to your computer and use it in GitHub Desktop.
Setting up Kafka in Legacy Spring Web application
package com.sample.service.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class ConsumerConfiguration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:8080");
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"test");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaConsumerStarter listener() {
return new KafkaConsumerStarter();
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
public class KafkaConsumerStarter {
private final List<String> messages = new ArrayList<>();
@KafkaListener(topics = "topic1",groupId = "test")
public void listen(String message){
System.out.println(message);
messages.add(message);
}
public List<String> getMessages() {
return messages;
}
}
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.1.xsd">
<!-- comoponenet scan should be the pacakge where the config file resides -->
<context:component-scan base-package="com.sample.service.config"/>
<bean class="com.sample.service.config.ConsumerConfiguration"/>
</beans>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment