Skip to content

Instantly share code, notes, and snippets.

@pavelfomin
Last active December 8, 2023 15:50
Show Gist options
  • Save pavelfomin/9ffa07cde6d0f3d138b248077d3303cb to your computer and use it in GitHub Desktop.
Save pavelfomin/9ffa07cde6d0f3d138b248077d3303cb to your computer and use it in GitHub Desktop.
Spring Kafka configuration for consuming JSON and Avro formats using @KafkaListener.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
@RequiredArgsConstructor
@Slf4j
public class AvroDataConsumer {
@KafkaListener(
containerFactory = "avroKafkaListenerContainerFactory",
topics = {"${app.kafka.topics.json}"}
)
public void consume(ConsumerRecord<String, Object> record, MessageHeaders headers) {
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
@RequiredArgsConstructor
@Slf4j
public class JsonDataConsumer {
@KafkaListener(
containerFactory = "jsonKafkaListenerContainerFactory",
topics = {"${app.kafka.topics.avro}"}
)
public void consume(ConsumerRecord<String, Object> record, MessageHeaders headers) {
}
}
import lombok.Generated;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.Map;
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
@ConditionalOnMissingBean // use avroKafkaConsumerFactory from test context if available
@Generated // exclude from coverage since another bean is used by the integration tests
@Bean
ConsumerFactory<?, ?> avroKafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
/**
* The bean name kafkaListenerContainerFactory is used by {@link org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactory KafkaAnnotationDrivenConfiguration.kafkaListenerContainerFactory}
* to create a default `ConcurrentKafkaListenerContainerFactory` bean which uses (and requires a single) default `ConsumerFactory`.
* By providing the alias name, no additional default instance of `ConcurrentKafkaListenerContainerFactory` is created.
*/
@Bean(name = {"avroKafkaListenerContainerFactory", "kafkaListenerContainerFactory"})
ConcurrentKafkaListenerContainerFactory<?, ?> avroKafkaListenerContainerFactory(
@Qualifier("avroKafkaConsumerFactory") ConsumerFactory<? super Object, ? super Object> avroKafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(avroKafkaConsumerFactory);
return factory;
}
@Bean
ConsumerFactory<?, ?> jsonKafkaConsumerFactory() {
Map<String, Object> configs = kafkaProperties.buildConsumerProperties();
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean(name = "jsonKafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> jsonKafkaListenerContainerFactory(
@Qualifier("jsonKafkaConsumerFactory") ConsumerFactory<? super Object, ? super Object> jsonKafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jsonKafkaConsumerFactory);
return factory;
}
}
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.JsonSerializer
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
class TestKafkaConfiguration {
private final KafkaProperties kafkaProperties
TestKafkaConfiguration(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties
}
@Bean
SchemaRegistryClient schemaRegistryClient() {
return new MockSchemaRegistryClient()
}
@Bean
KafkaAvroSerializer kafkaAvroSerializer() {
return new KafkaAvroSerializer(schemaRegistryClient(), kafkaProperties.buildProducerProperties())
}
@Bean
KafkaAvroDeserializer kafkaAvroDeserializer() {
return new KafkaAvroDeserializer(schemaRegistryClient(), kafkaProperties.buildConsumerProperties())
}
@Bean
@Primary
ConsumerFactory<?, ?> avroKafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory(
kafkaProperties.buildConsumerProperties(),
new StringDeserializer(),
kafkaAvroDeserializer()
)
}
@Bean
DefaultKafkaProducerFactory<?, ?> avroKafkaProducerFactory() {
return new DefaultKafkaProducerFactory(
kafkaProperties.buildProducerProperties(),
new StringSerializer(),
kafkaAvroSerializer()
)
}
@Bean
DefaultKafkaProducerFactory<?, ?> jsonKafkaProducerFactory() {
Map<String, Object> configs = kafkaProperties.buildProducerProperties()
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())
return new DefaultKafkaProducerFactory<>(configs)
}
@Bean
KafkaTemplate<?, ?> avroKafkaTemplate() {
return new KafkaTemplate<>(avroKafkaProducerFactory())
}
@Bean
KafkaTemplate<?, ?> jsonKafkaTemplate() {
return new KafkaTemplate<>(jsonKafkaProducerFactory())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment