Skip to content

Instantly share code, notes, and snippets.

@codegard
Created May 27, 2019 22:35
Show Gist options
  • Save codegard/2246c031f28be930e66a7a03fa157167 to your computer and use it in GitHub Desktop.
Save codegard/2246c031f28be930e66a7a03fa157167 to your computer and use it in GitHub Desktop.
package com.rataj.kafka.config;
import com.rataj.kafka.model.Person;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfiguration {
@Bean
ConsumerFactory<String, Person> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put("bootstrap.servers", "127.0.0.1:21345");
properties.put("group.id", "group");
properties.put("enable.auto.commit", false);
properties.put("auto.commit.interval.ms", "10");
properties.put("session.timeout.ms", "60000");
return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(), new JsonDeserializer<>(Person.class));
}
@Bean
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, Person> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConcurrency(2);
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
kafkaListenerContainerFactory.setErrorHandler(new KafkaErrorHandler());
return kafkaListenerContainerFactory;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment