Skip to content

Instantly share code, notes, and snippets.

@mike-seger
Last active May 26, 2024 00:38
Show Gist options
  • Save mike-seger/8bf31dfd7a5813b07374d5609244bcb3 to your computer and use it in GitHub Desktop.
Save mike-seger/8bf31dfd7a5813b07374d5609244bcb3 to your computer and use it in GitHub Desktop.
Dual Broker
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
// Producer 2
@Bean
@DependsOn("kafkaPropertiesInitializer")
public KafkaTemplate<String, String> kafkaTemplate2() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProducer2().getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer2().getKeySerializer());
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer2().getValueSerializer());
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps));
}
// Consumer 2
@Bean
@DependsOn("kafkaPropertiesInitializer")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory2() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getConsumer2().getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer2().getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer2().getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer2().getValueDeserializer());
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Autowired
private StandardKafkaProperties standardKafkaProperties;
@Autowired
private CustomKafkaProperties customKafkaProperties;
@PostConstruct
public void customizeKafkaProperties() {
// Copy all standard properties
customKafkaProperties.setProducer(new HashMap<>(standardKafkaProperties.getProducer()));
customKafkaProperties.setConsumer(new HashMap<>(standardKafkaProperties.getConsumer()));
// Adjust specific properties
customKafkaProperties.getProducer().put("some.property", "new-value"); // Adjust or add custom settings
customKafkaProperties.getConsumer().put("some.property", "new-value");
}
@Bean
public KafkaTemplate<String, Object> customKafkaTemplate() {
// Use customKafkaProperties to configure this template
Map<String, Object> props = new HashMap<>();
props.putAll(customKafkaProperties.getProducer());
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
}
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "custom.kafka")
public class KafkaProperties {
private Producer producer2;
private Consumer consumer2;
@Data
public static class Producer {
private String bootstrapServers;
private String keySerializer = StringSerializer.class.getName();
private String valueSerializer = StringSerializer.class.getName();
}
@Data
public static class Consumer {
private String bootstrapServers;
private String groupId;
private String keyDeserializer = StringDeserializer.class.getName();
private String valueDeserializer = StringDeserializer.class.getName();
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class KafkaPropertiesInitializer implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private Environment env;
@Autowired
private CustomKafkaProperties customKafkaProperties;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
Map<String, Object> allKafkaProperties = new HashMap<>();
Map<String, Object> producerProperties = new HashMap<>();
Map<String, Object> consumerProperties = new HashMap<>();
// Collect all properties that start with spring.kafka
env.getPropertySources().forEach(propertySource -> {
if (propertySource.containsProperty("spring.kafka")) {
Map<String, Object> properties = (Map<String, Object>) propertySource.getSource();
properties.forEach((key, value) -> {
if (key.startsWith("spring.kafka.")) {
if (key.startsWith("spring.kafka.producer.")) {
producerProperties.put(key.substring("spring.kafka.producer.".length()), value);
} else if (key.startsWith("spring.kafka.consumer.")) {
consumerProperties.put(key.substring("spring.kafka.consumer.".length()), value);
} else {
allKafkaProperties.put(key.substring("spring.kafka.".length()), value);
}
}
});
}
});
// Now set them to the custom properties, assuming setters/getters are correctly set up
customKafkaProperties.setCommonProperties(allKafkaProperties);
customKafkaProperties.setProducer(producerProperties);
customKafkaProperties.setConsumer(consumerProperties);
}
}
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class KafkaPropertiesInitializer implements ApplicationListener<ContextRefreshedEvent> {
private final Environment environment;
private final CustomKafkaProperties customKafkaProperties;
public KafkaPropertiesInitializer(Environment environment, CustomKafkaProperties customKafkaProperties) {
this.environment = environment;
this.customKafkaProperties = customKafkaProperties;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
Binder binder = Binder.get(environment);
// Bind properties under spring.kafka to a Map
Map<String, Object> kafkaProperties = binder.bind("spring.kafka", Map.class).get();
Map<String, Object> producerProperties = new HashMap<>();
Map<String, Object> consumerProperties = new HashMap<>();
Map<String, Object> commonProperties = binder.bind("spring.kafka.properties", Map.class).orElseGet(HashMap::new);
// Iterate and categorize properties
kafkaProperties.forEach((key, value) -> {
if (key.startsWith("producer.")) {
producerProperties.put(key.substring(9), value);
} else if (key.startsWith("consumer.")) {
consumerProperties.put(key.substring(9), value);
} else {
commonProperties.put(key, value);
}
});
// Assign to custom properties
customKafkaProperties.setProducer(producerProperties);
customKafkaProperties.setConsumer(consumerProperties);
customKafkaProperties.setCommonProperties(commonProperties);
}
}
import java.util.HashMap;
import java.util.Map;
public class MapFlattener {
/**
* Flattens a hierarchical map into a flat map with keys as the path.
* @param map The hierarchical map to flatten.
* @param prefix The current prefix to prepend to keys (used recursively).
* @return A flat map with string keys and values.
*/
public static Map<String, String> flattenMap(Map<String, Object> map, String prefix) {
Map<String, String> flatMap = new HashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
String newKey = prefix.isEmpty() ? key : prefix + "." + key;
if (value instanceof Map<?, ?>) {
// Recursive call to handle nested maps
flatMap.putAll(flattenMap((Map<String, Object>) value, newKey));
} else {
// Put the value converted to string in the flat map
flatMap.put(newKey, value.toString());
}
}
return flatMap;
}
/**
* Convenience wrapper method for starting the recursion without a prefix.
* @param map The hierarchical map to flatten.
* @return A flat map with string keys and values.
*/
public static Map<String, String> flattenMap(Map<String, Object> map) {
return flattenMap(map, "");
}
}
Binder binder = Binder.get(applicationContext.getEnvironment());
Bindable<Map<String, Object>> bindableMap = Bindable.mapOf(String.class, Object.class);
Map<String, Object> kafkaProps = binder.bind("spring.kafka", bindableMap).orElse(new HashMap<>());
Map<String, String> flatKafkaProps = MapFlattener.flattenMap(kafkaProps);
import java.util.HashMap;
import java.util.Map;
public class MapKeyTransformer {
/**
* Transforms keys in the provided map by replacing dashes with dots.
* @param originalMap The original map with keys possibly containing dashes.
* @return A new map with keys where dashes have been replaced with dots.
*/
public static Map<String, Object> convertDashesToDotsInKeys(Map<String, Object> originalMap) {
Map<String, Object> transformedMap = new HashMap<>();
for (Map.Entry<String, Object> entry : originalMap.entrySet()) {
String transformedKey = entry.getKey().replace('-', '.');
transformedMap.put(transformedKey, entry.getValue());
}
return transformedMap;
}
}
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.ConfigurableApplicationContext;
@Component
public class PropertyBindingApplicationRunner implements ApplicationRunner {
private final ConfigurableApplicationContext applicationContext;
public PropertyBindingApplicationRunner(ConfigurableApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public void run(ApplicationArguments args) throws Exception {
Binder binder = Binder.get(applicationContext.getEnvironment());
// Bind properties as needed
CustomKafkaProperties properties = binder.bind("custom.kafka", CustomKafkaProperties.class).orElseThrow();
// Use the properties as needed
System.out.println(properties.getCommonProperties());
}
}
spring.context.initializer.classes=com.example.KafkaPropertiesInitializer
public class KafkaPropertiesInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
Environment environment = applicationContext.getEnvironment();
// Create a Binder object from the environment
Binder binder = Binder.get(environment);
// Bind the properties to the CustomKafkaProperties class
CustomKafkaProperties properties = binder.bind("custom.kafka", CustomKafkaProperties.class).get();
public class PropertiesInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
// Assuming you can still access some environment properties indirectly
Binder binder = Binder.get(applicationContext.getEnvironment());
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
public class StandardKafkaProperties {
private Map<String, String> producer = new HashMap<>();
private Map<String, String> consumer = new HashMap<>();
}
@Data
@Component
@ConfigurationProperties(prefix = "custom.kafka")
public class CustomKafkaProperties {
private Map<String, String> producer = new HashMap<>();
private Map<String, String> consumer = new HashMap<>();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment