@Configuration | |
@EnableKafka | |
public class ReceiverConfig { | |
@Value("${spring.kafka.bootstrap-servers}") | |
private String bootstrapServers; | |
@Value("${spring.kafka.consumer.group-id}") | |
private String groupId; | |
@Value("${spring.kafka.consumer.auto-offset-reset}") | |
private String autoOffsetReset; | |
@Bean | |
public Map<String, Object> consumerConfigs() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
return props; | |
} | |
@Bean | |
public ConsumerFactory<String, String> consumerFactory() { | |
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), | |
new StringDeserializer()); | |
} | |
@Bean | |
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, String> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
factory.setMessageConverter(new StringJsonMessageConverter()); | |
return factory; | |
} | |
@Bean | |
public Receiver receiver() { | |
return new Receiver(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment