| @Configuration | |
| @EnableKafka | |
| public class ReceiverConfigNotConfluent implements ReceiverConfig { | |
| @Value("${spring.kafka.bootstrap-servers}") | |
| private String bootstrapServers; | |
| @Value("${spring.kafka.consumer.group-id}") | |
| private String groupId; | |
| @Value("${spring.kafka.consumer.auto-offset-reset}") | |
| private String autoOffsetReset; | |
| @Override | |
| @Bean | |
| public Map<String, Object> consumerConfigs() { | |
| Map<String, Object> props = new HashMap<>(); | |
| props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | |
| props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
| props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
| props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
| props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | |
| return props; | |
| } | |
| @Override | |
| @Bean | |
| public ConsumerFactory<String, String> consumerFactory() { | |
| return new DefaultKafkaConsumerFactory<>(consumerConfigs(), | |
| new StringDeserializer(), | |
| new StringDeserializer() | |
| ); | |
| } | |
| @Bean | |
| ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { | |
| ConcurrentKafkaListenerContainerFactory<String, String> factory = | |
| new ConcurrentKafkaListenerContainerFactory<>(); | |
| factory.setConsumerFactory(consumerFactory()); | |
| factory.setMessageConverter(new StringJsonMessageConverter()); | |
| return factory; | |
| } | |
| @Override | |
| @Bean | |
| public Receiver receiver() { | |
| return new Receiver(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment