Skip to content

Instantly share code, notes, and snippets.

@garystafford
Created December 26, 2018 05:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save garystafford/64cd57d96af2659dcaa3ae1846c791da to your computer and use it in GitHub Desktop.
Save garystafford/64cd57d96af2659dcaa3ae1846c791da to your computer and use it in GitHub Desktop.
package com.storefront.config;
import com.storefront.kafka.Sender;
import com.storefront.model.CustomerChangeEvent;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Profile("gke")
@Configuration
@EnableKafka
public class SenderConfigConfluent implements SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.properties.ssl.endpoint.identification.algorithm}")
private String sslEndpointIdentificationAlgorithm;
@Value("${spring.kafka.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.properties.request.timeout.ms}")
private String requestTimeoutMs;
@Value("${spring.kafka.properties.retry.backoff.ms}")
private String retryBackoffMs;
@Value("${spring.kafka.properties.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String saslJaasConfig;
@Override
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put("ssl.endpoint.identification.algorithm", sslEndpointIdentificationAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("request.timeout.ms", requestTimeoutMs);
props.put("retry.backoff.ms", retryBackoffMs);
props.put("security.protocol", securityProtocol);
props.put("sasl.jaas.config", saslJaasConfig);
return props;
}
@Override
@Bean
public ProducerFactory<String, CustomerChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Override
@Bean
public KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Override
@Bean
public Sender sender() {
return new Sender();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment