Skip to content

Instantly share code, notes, and snippets.

@jesty
Created April 7, 2019 15:34
Show Gist options
  • Save jesty/548f20abf4670abb807e6c0cd4a04899 to your computer and use it in GitHub Desktop.
Save jesty/548f20abf4670abb807e6c0cd4a04899 to your computer and use it in GitHub Desktop.
This class will be useful when you need to stop Kafka consuming when you are using Spring Cloud Stream with Kafka. I use this class to stop consuming during some management operation on my microservice.
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public class SpringKafkaStartStop {
private final Logger log = LoggerFactory.getLogger(SpringKafkaStartStop.class);
private final Map<String, Binding<MessageChannel>> bindingChannels = new ConcurrentHashMap<>();
@EventListener(BindingCreatedEvent.class)
public void listenBinding(BindingCreatedEvent event) {
//collect all binders in order to stop later
Binding<MessageChannel> binding = (Binding<MessageChannel>) event.getSource();
log.info("Collecting binding event: {}", binding);
bindingChannels.put(binding.getName(), binding);
}
public void start() {
try {
bindingChannels.entrySet().forEach(entry -> {
log.info("Binding {} {}", entry.getKey(), entry.getValue());
entry.getValue().start();
});
} catch (BeansException e) {
throw new IllegalStateException("Cannot perform binding, no proper implementation found", e);
}
}
public void stop() {
try {
bindingChannels.entrySet().forEach(entry -> {
log.info("Unbinding {}", entry.getKey());
entry.getValue().stop();
});
} catch (BeansException e) {
throw new IllegalStateException("Cannot perform unbinding, no proper implementation found", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment