Skip to content

Instantly share code, notes, and snippets.

@sheelprabhakar
Last active March 6, 2021 09:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sheelprabhakar/8b98c77f3080dbc52a08bda43da2b554 to your computer and use it in GitHub Desktop.
Save sheelprabhakar/8b98c77f3080dbc52a08bda43da2b554 to your computer and use it in GitHub Desktop.
Spring boot kafka consumer config
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value( "${kafka.bootstrapAddress}" )
private String bootstrapAddress;
@Value( "${kafka.groupId}" )
private String groupId;
@Bean
public ConsumerFactory<String, WeatherInfo> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonDeserializer<WeatherInfo> deserializer = new JsonDeserializer<>(WeatherInfo.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, WeatherInfo>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, WeatherInfo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment