Last active
May 26, 2024 00:38
-
-
Save mike-seger/8bf31dfd7a5813b07374d5609244bcb3 to your computer and use it in GitHub Desktop.
Dual Broker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.core.ConsumerFactory; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Configuration | |
public class KafkaConfig { | |
@Autowired | |
private KafkaProperties kafkaProperties; | |
// Producer 2 | |
@Bean | |
@DependsOn("kafkaPropertiesInitializer") | |
public KafkaTemplate<String, String> kafkaTemplate2() { | |
Map<String, Object> configProps = new HashMap<>(); | |
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProducer2().getBootstrapServers()); | |
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer2().getKeySerializer()); | |
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProperties.getProducer2().getValueSerializer()); | |
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps)); | |
} | |
// Consumer 2 | |
@Bean | |
@DependsOn("kafkaPropertiesInitializer") | |
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory2() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getConsumer2().getBootstrapServers()); | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer2().getGroupId()); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer2().getKeyDeserializer()); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer2().getValueDeserializer()); | |
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props); | |
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory); | |
return factory; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import javax.annotation.PostConstruct; | |
import java.util.Map; | |
@Configuration | |
public class KafkaConfig { | |
@Autowired | |
private StandardKafkaProperties standardKafkaProperties; | |
@Autowired | |
private CustomKafkaProperties customKafkaProperties; | |
@PostConstruct | |
public void customizeKafkaProperties() { | |
// Copy all standard properties | |
customKafkaProperties.setProducer(new HashMap<>(standardKafkaProperties.getProducer())); | |
customKafkaProperties.setConsumer(new HashMap<>(standardKafkaProperties.getConsumer())); | |
// Adjust specific properties | |
customKafkaProperties.getProducer().put("some.property", "new-value"); // Adjust or add custom settings | |
customKafkaProperties.getConsumer().put("some.property", "new-value"); | |
} | |
@Bean | |
public KafkaTemplate<String, Object> customKafkaTemplate() { | |
// Use customKafkaProperties to configure this template | |
Map<String, Object> props = new HashMap<>(); | |
props.putAll(customKafkaProperties.getProducer()); | |
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.Data; | |
import org.springframework.boot.context.properties.ConfigurationProperties; | |
import org.springframework.context.annotation.Configuration; | |
@Data | |
@Configuration | |
@ConfigurationProperties(prefix = "custom.kafka") | |
public class KafkaProperties { | |
private Producer producer2; | |
private Consumer consumer2; | |
@Data | |
public static class Producer { | |
private String bootstrapServers; | |
private String keySerializer = StringSerializer.class.getName(); | |
private String valueSerializer = StringSerializer.class.getName(); | |
} | |
@Data | |
public static class Consumer { | |
private String bootstrapServers; | |
private String groupId; | |
private String keyDeserializer = StringDeserializer.class.getName(); | |
private String valueDeserializer = StringDeserializer.class.getName(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.ApplicationListener; | |
import org.springframework.context.event.ContextRefreshedEvent; | |
import org.springframework.core.env.Environment; | |
import org.springframework.stereotype.Component; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Component | |
public class KafkaPropertiesInitializer implements ApplicationListener<ContextRefreshedEvent> { | |
@Autowired | |
private Environment env; | |
@Autowired | |
private CustomKafkaProperties customKafkaProperties; | |
@Override | |
public void onApplicationEvent(ContextRefreshedEvent event) { | |
Map<String, Object> allKafkaProperties = new HashMap<>(); | |
Map<String, Object> producerProperties = new HashMap<>(); | |
Map<String, Object> consumerProperties = new HashMap<>(); | |
// Collect all properties that start with spring.kafka | |
env.getPropertySources().forEach(propertySource -> { | |
if (propertySource.containsProperty("spring.kafka")) { | |
Map<String, Object> properties = (Map<String, Object>) propertySource.getSource(); | |
properties.forEach((key, value) -> { | |
if (key.startsWith("spring.kafka.")) { | |
if (key.startsWith("spring.kafka.producer.")) { | |
producerProperties.put(key.substring("spring.kafka.producer.".length()), value); | |
} else if (key.startsWith("spring.kafka.consumer.")) { | |
consumerProperties.put(key.substring("spring.kafka.consumer.".length()), value); | |
} else { | |
allKafkaProperties.put(key.substring("spring.kafka.".length()), value); | |
} | |
} | |
}); | |
} | |
}); | |
// Now set them to the custom properties, assuming setters/getters are correctly set up | |
customKafkaProperties.setCommonProperties(allKafkaProperties); | |
customKafkaProperties.setProducer(producerProperties); | |
customKafkaProperties.setConsumer(consumerProperties); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.springframework.boot.context.properties.bind.Binder; | |
import org.springframework.context.ApplicationListener; | |
import org.springframework.context.event.ContextRefreshedEvent; | |
import org.springframework.core.env.Environment; | |
import org.springframework.stereotype.Component; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Component | |
public class KafkaPropertiesInitializer implements ApplicationListener<ContextRefreshedEvent> { | |
private final Environment environment; | |
private final CustomKafkaProperties customKafkaProperties; | |
public KafkaPropertiesInitializer(Environment environment, CustomKafkaProperties customKafkaProperties) { | |
this.environment = environment; | |
this.customKafkaProperties = customKafkaProperties; | |
} | |
@Override | |
public void onApplicationEvent(ContextRefreshedEvent event) { | |
Binder binder = Binder.get(environment); | |
// Bind properties under spring.kafka to a Map | |
Map<String, Object> kafkaProperties = binder.bind("spring.kafka", Map.class).get(); | |
Map<String, Object> producerProperties = new HashMap<>(); | |
Map<String, Object> consumerProperties = new HashMap<>(); | |
Map<String, Object> commonProperties = binder.bind("spring.kafka.properties", Map.class).orElseGet(HashMap::new); | |
// Iterate and categorize properties | |
kafkaProperties.forEach((key, value) -> { | |
if (key.startsWith("producer.")) { | |
producerProperties.put(key.substring(9), value); | |
} else if (key.startsWith("consumer.")) { | |
consumerProperties.put(key.substring(9), value); | |
} else { | |
commonProperties.put(key, value); | |
} | |
}); | |
// Assign to custom properties | |
customKafkaProperties.setProducer(producerProperties); | |
customKafkaProperties.setConsumer(consumerProperties); | |
customKafkaProperties.setCommonProperties(commonProperties); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.HashMap; | |
import java.util.Map; | |
public class MapFlattener { | |
/** | |
* Flattens a hierarchical map into a flat map with keys as the path. | |
* @param map The hierarchical map to flatten. | |
* @param prefix The current prefix to prepend to keys (used recursively). | |
* @return A flat map with string keys and values. | |
*/ | |
public static Map<String, String> flattenMap(Map<String, Object> map, String prefix) { | |
Map<String, String> flatMap = new HashMap<>(); | |
for (Map.Entry<String, Object> entry : map.entrySet()) { | |
String key = entry.getKey(); | |
Object value = entry.getValue(); | |
String newKey = prefix.isEmpty() ? key : prefix + "." + key; | |
if (value instanceof Map<?, ?>) { | |
// Recursive call to handle nested maps | |
flatMap.putAll(flattenMap((Map<String, Object>) value, newKey)); | |
} else { | |
// Put the value converted to string in the flat map | |
flatMap.put(newKey, value.toString()); | |
} | |
} | |
return flatMap; | |
} | |
/** | |
* Convenience wrapper method for starting the recursion without a prefix. | |
* @param map The hierarchical map to flatten. | |
* @return A flat map with string keys and values. | |
*/ | |
public static Map<String, String> flattenMap(Map<String, Object> map) { | |
return flattenMap(map, ""); | |
} | |
} | |
Binder binder = Binder.get(applicationContext.getEnvironment()); | |
Bindable<Map<String, Object>> bindableMap = Bindable.mapOf(String.class, Object.class); | |
Map<String, Object> kafkaProps = binder.bind("spring.kafka", bindableMap).orElse(new HashMap<>()); | |
Map<String, String> flatKafkaProps = MapFlattener.flattenMap(kafkaProps); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.HashMap; | |
import java.util.Map; | |
public class MapKeyTransformer { | |
/** | |
* Transforms keys in the provided map by replacing dashes with dots. | |
* @param originalMap The original map with keys possibly containing dashes. | |
* @return A new map with keys where dashes have been replaced with dots. | |
*/ | |
public static Map<String, Object> convertDashesToDotsInKeys(Map<String, Object> originalMap) { | |
Map<String, Object> transformedMap = new HashMap<>(); | |
for (Map.Entry<String, Object> entry : originalMap.entrySet()) { | |
String transformedKey = entry.getKey().replace('-', '.'); | |
transformedMap.put(transformedKey, entry.getValue()); | |
} | |
return transformedMap; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.springframework.boot.ApplicationArguments; | |
import org.springframework.boot.ApplicationRunner; | |
import org.springframework.stereotype.Component; | |
import org.springframework.boot.context.properties.bind.Binder; | |
import org.springframework.context.ConfigurableApplicationContext; | |
@Component | |
public class PropertyBindingApplicationRunner implements ApplicationRunner { | |
private final ConfigurableApplicationContext applicationContext; | |
public PropertyBindingApplicationRunner(ConfigurableApplicationContext applicationContext) { | |
this.applicationContext = applicationContext; | |
} | |
@Override | |
public void run(ApplicationArguments args) throws Exception { | |
Binder binder = Binder.get(applicationContext.getEnvironment()); | |
// Bind properties as needed | |
CustomKafkaProperties properties = binder.bind("custom.kafka", CustomKafkaProperties.class).orElseThrow(); | |
// Use the properties as needed | |
System.out.println(properties.getCommonProperties()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spring.context.initializer.classes=com.example.KafkaPropertiesInitializer | |
public class KafkaPropertiesInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> { | |
@Override | |
public void initialize(ConfigurableApplicationContext applicationContext) { | |
Environment environment = applicationContext.getEnvironment(); | |
// Create a Binder object from the environment | |
Binder binder = Binder.get(environment); | |
// Bind the properties to the CustomKafkaProperties class | |
CustomKafkaProperties properties = binder.bind("custom.kafka", CustomKafkaProperties.class).get(); | |
public class PropertiesInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> { | |
@Override | |
public void initialize(ConfigurableApplicationContext applicationContext) { | |
// Assuming you can still access some environment properties indirectly | |
Binder binder = Binder.get(applicationContext.getEnvironment()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.Data; | |
import org.springframework.boot.context.properties.ConfigurationProperties; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.stereotype.Component; | |
@Data | |
@Configuration | |
@ConfigurationProperties(prefix = "spring.kafka") | |
public class StandardKafkaProperties { | |
private Map<String, String> producer = new HashMap<>(); | |
private Map<String, String> consumer = new HashMap<>(); | |
} | |
@Data | |
@Component | |
@ConfigurationProperties(prefix = "custom.kafka") | |
public class CustomKafkaProperties { | |
private Map<String, String> producer = new HashMap<>(); | |
private Map<String, String> consumer = new HashMap<>(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment