See https://gist.github.com/zlaval/13d1bd07363e4eb71bcccb3a5a9bf7a0 for Spring Cloud Stream Kafka declarative configuration.
Last active
December 8, 2023 15:50
-
-
Save pavelfomin/9ffa07cde6d0f3d138b248077d3303cb to your computer and use it in GitHub Desktop.
Spring Kafka configuration for consuming JSON and Avro formats using @KafkaListener.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.springframework.kafka.annotation.KafkaListener; | |
import org.springframework.messaging.MessageHeaders; | |
@RequiredArgsConstructor | |
@Slf4j | |
public class AvroDataConsumer { | |
@KafkaListener( | |
containerFactory = "avroKafkaListenerContainerFactory", | |
topics = {"${app.kafka.topics.json}"} | |
) | |
public void consume(ConsumerRecord<String, Object> record, MessageHeaders headers) { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.springframework.kafka.annotation.KafkaListener; | |
import org.springframework.messaging.MessageHeaders; | |
@RequiredArgsConstructor | |
@Slf4j | |
public class JsonDataConsumer { | |
@KafkaListener( | |
containerFactory = "jsonKafkaListenerContainerFactory", | |
topics = {"${app.kafka.topics.avro}"} | |
) | |
public void consume(ConsumerRecord<String, Object> record, MessageHeaders headers) { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.Generated; | |
import lombok.RequiredArgsConstructor; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; | |
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | |
import org.springframework.boot.context.properties.EnableConfigurationProperties; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.core.ConsumerFactory; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import org.springframework.kafka.support.serializer.JsonDeserializer; | |
import java.util.Map; | |
@Configuration | |
@RequiredArgsConstructor | |
@EnableConfigurationProperties(KafkaProperties.class) | |
public class KafkaConfiguration { | |
private final KafkaProperties kafkaProperties; | |
@ConditionalOnMissingBean // use avroKafkaConsumerFactory from test context if available | |
@Generated // exclude from coverage since another bean is used by the integration tests | |
@Bean | |
ConsumerFactory<?, ?> avroKafkaConsumerFactory() { | |
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); | |
} | |
/** | |
* The bean name kafkaListenerContainerFactory is used by {@link org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactory KafkaAnnotationDrivenConfiguration.kafkaListenerContainerFactory} | |
* to create a default `ConcurrentKafkaListenerContainerFactory` bean which uses (and requires a single) default `ConsumerFactory`. | |
* By providing the alias name, no additional default instance of `ConcurrentKafkaListenerContainerFactory` is created. | |
*/ | |
@Bean(name = {"avroKafkaListenerContainerFactory", "kafkaListenerContainerFactory"}) | |
ConcurrentKafkaListenerContainerFactory<?, ?> avroKafkaListenerContainerFactory( | |
@Qualifier("avroKafkaConsumerFactory") ConsumerFactory<? super Object, ? super Object> avroKafkaConsumerFactory) { | |
ConcurrentKafkaListenerContainerFactory<?, ?> factory = new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(avroKafkaConsumerFactory); | |
return factory; | |
} | |
@Bean | |
ConsumerFactory<?, ?> jsonKafkaConsumerFactory() { | |
Map<String, Object> configs = kafkaProperties.buildConsumerProperties(); | |
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); | |
return new DefaultKafkaConsumerFactory<>(configs); | |
} | |
@Bean(name = "jsonKafkaListenerContainerFactory") | |
ConcurrentKafkaListenerContainerFactory<?, ?> jsonKafkaListenerContainerFactory( | |
@Qualifier("jsonKafkaConsumerFactory") ConsumerFactory<? super Object, ? super Object> jsonKafkaConsumerFactory) { | |
ConcurrentKafkaListenerContainerFactory<?, ?> factory = new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(jsonKafkaConsumerFactory); | |
return factory; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient | |
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient | |
import io.confluent.kafka.serializers.KafkaAvroDeserializer | |
import io.confluent.kafka.serializers.KafkaAvroSerializer | |
import org.apache.kafka.clients.producer.ProducerConfig | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.kafka.common.serialization.StringSerializer | |
import org.springframework.boot.autoconfigure.kafka.KafkaProperties | |
import org.springframework.boot.context.properties.EnableConfigurationProperties | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
import org.springframework.context.annotation.Primary | |
import org.springframework.kafka.core.ConsumerFactory | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory | |
import org.springframework.kafka.core.KafkaTemplate | |
import org.springframework.kafka.support.serializer.JsonSerializer | |
@Configuration | |
@EnableConfigurationProperties(KafkaProperties.class) | |
class TestKafkaConfiguration { | |
private final KafkaProperties kafkaProperties | |
TestKafkaConfiguration(KafkaProperties kafkaProperties) { | |
this.kafkaProperties = kafkaProperties | |
} | |
@Bean | |
SchemaRegistryClient schemaRegistryClient() { | |
return new MockSchemaRegistryClient() | |
} | |
@Bean | |
KafkaAvroSerializer kafkaAvroSerializer() { | |
return new KafkaAvroSerializer(schemaRegistryClient(), kafkaProperties.buildProducerProperties()) | |
} | |
@Bean | |
KafkaAvroDeserializer kafkaAvroDeserializer() { | |
return new KafkaAvroDeserializer(schemaRegistryClient(), kafkaProperties.buildConsumerProperties()) | |
} | |
@Bean | |
@Primary | |
ConsumerFactory<?, ?> avroKafkaConsumerFactory() { | |
return new DefaultKafkaConsumerFactory( | |
kafkaProperties.buildConsumerProperties(), | |
new StringDeserializer(), | |
kafkaAvroDeserializer() | |
) | |
} | |
@Bean | |
DefaultKafkaProducerFactory<?, ?> avroKafkaProducerFactory() { | |
return new DefaultKafkaProducerFactory( | |
kafkaProperties.buildProducerProperties(), | |
new StringSerializer(), | |
kafkaAvroSerializer() | |
) | |
} | |
@Bean | |
DefaultKafkaProducerFactory<?, ?> jsonKafkaProducerFactory() { | |
Map<String, Object> configs = kafkaProperties.buildProducerProperties() | |
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()) | |
return new DefaultKafkaProducerFactory<>(configs) | |
} | |
@Bean | |
KafkaTemplate<?, ?> avroKafkaTemplate() { | |
return new KafkaTemplate<>(avroKafkaProducerFactory()) | |
} | |
@Bean | |
KafkaTemplate<?, ?> jsonKafkaTemplate() { | |
return new KafkaTemplate<>(jsonKafkaProducerFactory()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment