Skip to content

Instantly share code, notes, and snippets.

@harishb2k
Last active October 6, 2022 12:58
Show Gist options
  • Save harishb2k/a8241f13e13c116e1b4d0525bc1290ab to your computer and use it in GitHub Desktop.
Save harishb2k/a8241f13e13c116e1b4d0525bc1290ab to your computer and use it in GitHub Desktop.
Messaging Properties - Kafka String Boot Integration
# Messaging to send events to update user event
# Update the Kafka and topic name for your own setup
messaging.producers.updateUserEvent.enabled: true
messaging.producers.updateUserEvent.topic: topic_123
messaging.producers.updateUserEvent.brokers: localhost:9092
messaging.producers.updateUserEvent.sync: true
messaging.producers.updateUserEvent.enableCircuitBreakerOnError: true
messaging.producers.updateUserEvent.request-timeout-ms: 100
messaging.producers.updateUserEvent.ack: all
messaging.producers.updateUserEvent.circuit-breaker-stay_in_open_state_on_error-ms: 10000

Note you can use other kafka properties e.g. linger.ms or batch.size. Since this is a property file use "-" instead of "." for kafka properties e.g. Kafka property is "request.timeout.ms" but in config you must have "request-timeout-ms"

What is enableCircuitBreakerOnError setting? Suppose you have some error in your kafka setup. Setting enableCircuitBreakerOnError=true will open a circuit (circuit breaker) and will ensure your service does not fail. Please note - when circuit is open, messages are dropped to ensure application resiliency(standard circiut breaker approch)

Use circuit-breaker.stay_in_open_state_on_error.ms to specify how long circuit will remain open, before retrying.

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Getter
@Setter
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@Configuration
@ConfigurationProperties(prefix = "messaging")
public class MessagingConfigs extends io.github.devlibx.easy.messaging.config.MessagingConfigs {
}
import io.gitbub.devlibx.easy.helper.metrics.IMetrics;
import io.gitbub.devlibx.easy.helper.string.StringHelper;
import io.github.devlibx.easy.messaging.config.MessagingConfigs;
import io.github.devlibx.easy.messaging.consumer.IConsumerService;
import io.github.devlibx.easy.messaging.kafka.consumer.KafkaBasedConsumerService;
import io.github.devlibx.easy.messaging.kafka.producer.KafkaBasedProducerService;
import io.github.devlibx.easy.messaging.producer.IProducerService;
import io.github.devlibx.easy.messaging.service.IMessagingFactory;
import io.github.devlibx.easy.messaging.service.MessageFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Configuration
public  class MessagingConfigurationBeanFactory {

    @Bean
    public StringHelper stringHelper() {
        return new StringHelper();
    }

    @Bean
    public IMetrics metrics() {
        return new IMetrics.NoOpMetrics();
    }

    @Bean
    @Autowired
    public IMessagingFactory messagingFactory(MessagingConfigs messagingConfigs, StringHelper stringHelper, IMetrics metrics) {
        Map<String, IProducerService> producerServiceMap = new HashMap<>();
        producerServiceMap.put("KAFKA", new KafkaBasedProducerService(stringHelper, metrics));
        Map<String, IConsumerService> consumerServiceMap = new HashMap<>();
        consumerServiceMap.put("KAFKA", new KafkaBasedConsumerService(metrics));
        IMessagingFactory messagingFactory = new MessageFactory(producerServiceMap, consumerServiceMap, messagingConfigs);
        messagingFactory.initialize();
        return messagingFactory;
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment