Skip to content

Instantly share code, notes, and snippets.

@codegard
Created May 27, 2019 22:44
Show Gist options
  • Save codegard/af21c73d3821312593d9da5cd069ff9c to your computer and use it in GitHub Desktop.
Save codegard/af21c73d3821312593d9da5cd069ff9c to your computer and use it in GitHub Desktop.
package com.rataj.kafka.config;
import com.rataj.kafka.model.Person;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfiguration {
@Bean
ConsumerFactory<String, Person> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put("bootstrap.servers", "127.0.0.1:21345");
properties.put("group.id", "group");
properties.put("enable.auto.commit", false);
properties.put("auto.commit.interval.ms", "10");
properties.put("session.timeout.ms", "60000");
ErrorHandlingDeserializer2<Person> errorHandlingDeserializer
= new ErrorHandlingDeserializer2<>(new JsonDeserializer<>(Person.class));
return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(), errorHandlingDeserializer);
}
@Bean
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, Person> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Person> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConcurrency(2);
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
kafkaListenerContainerFactory.setErrorHandler(new KafkaErrorHandler());
return kafkaListenerContainerFactory;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment