Skip to content

Instantly share code, notes, and snippets.

@dfrommi
Last active June 20, 2023 22:20
Show Gist options
  • Save dfrommi/08f90ab69b907d9b4f8bcc3e7174495b to your computer and use it in GitHub Desktop.
Save dfrommi/08f90ab69b907d9b4f8bcc3e7174495b to your computer and use it in GitHub Desktop.
Resilience4j Event-Handler to start/stop RabbitMQ-Listeners

Resilient RabbitMQ-Listener with Spring

In case of high error rates during message processing, it might be beneficial to stop message-consumption for some time. This does not only reduce the number of errors in the system, but also allows other consumers to process the message successfully.

Circuit-breakers are made for detecting high error rates and stop/restart the processing.

There is no prepared integration between RabbitMQ-Listeners and Circuit-Breakers, but the gap can be easiliy filled with a Resilience4j event-handler starting and stopping RabbitMQ-listener.

package dfrommi.adapter.in.messaging;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyEventConsumer {
private static final String ID = "my-event-consumer";
@RabbitListener(
id = ID,
ackMode = "AUTO",
bindings = @QueueBinding(
value = @Queue(value = "my-event-queue", durable = "true", exclusive = "false", autoDelete = "false"),
exchange = @Exchange(value = "my-event-exchange", type = ExchangeTypes.TOPIC),
key = "MyEvent")
)
@CircuitBreaker(name = ID) //`name` has to be the same as RabbitListener `id`
public void processCheckCompatibilityMessage(MyEvent payload, Message message) {
//process event
//throw exception on error
}
}
package dfrommi.infrastructure.messaging;
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.CLOSED;
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.FORCED_OPEN;
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.HALF_OPEN;
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.OPEN;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.core.registry.EntryAddedEvent;
import io.github.resilience4j.core.registry.EntryRemovedEvent;
import io.github.resilience4j.core.registry.EntryReplacedEvent;
import io.github.resilience4j.core.registry.RegistryEventConsumer;
import java.util.HashSet;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Slf4j
@RequiredArgsConstructor
@Component
public class RabbitCircuitBreakerIntegration implements RegistryEventConsumer<CircuitBreaker> {
private final RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
private Set<CircuitBreaker> circuitBreakersToWatch = new HashSet<>();
@Override
public void onEntryAddedEvent(EntryAddedEvent<CircuitBreaker> entryAddedEvent) {
var cb = entryAddedEvent.getAddedEntry();
cb.getEventPublisher().onStateTransition(transition ->
synchronizeCircuitBreakerWithListener(transition.getCircuitBreakerName(), transition.getStateTransition().getToState())
);
circuitBreakersToWatch.add(cb);
}
@Override
public void onEntryRemovedEvent(EntryRemovedEvent<CircuitBreaker> entryRemoveEvent) {
circuitBreakersToWatch.remove(entryRemoveEvent.getRemovedEntry());
}
@Override
public void onEntryReplacedEvent(EntryReplacedEvent<CircuitBreaker> entryReplacedEvent) {
//same as delete and add
}
//Just in case anything goes wrong in the CircuitBreaker transition handler, then the listener will not remain stopped forever
@Scheduled(fixedRateString = "PT2M")
public void listenerRunningWatchdog() {
circuitBreakersToWatch.forEach(cb ->
synchronizeCircuitBreakerWithListener(cb.getName(), cb.getState())
);
}
protected void synchronizeCircuitBreakerWithListener(String circuitBreakerName, CircuitBreaker.State state) {
//Listener not available when circuit-breaker is added
String listenerId = getListenerIdFromCircuitBreakerName(circuitBreakerName);
var listener = rabbitListenerEndpointRegistry.getListenerContainer(listenerId);
if (listener == null) {
return;
}
if ((state == CLOSED || state == HALF_OPEN) && !listener.isRunning()) {
log.info("Starting RabbitMQ listener {} because state of circuit-breaker {} is {}", listenerId, circuitBreakerName, state);
try {
listener.start();
} catch (Exception e) {
log.error("Error starting RabbitMQ listener {} after state of circuit-breaker {} is {}", listenerId, circuitBreakerName, state);
}
}
if ((state == OPEN || state == FORCED_OPEN) && listener.isRunning()) {
log.info("Stopping RabbitMQ listener {} because state of circuit-breaker {} is {}", listenerId, circuitBreakerName, state);
try {
listener.stop();
} catch (Exception e) {
log.error("Error stopping RabbitMQ listener {} after state of circuit-breaker {} is {}", listenerId, circuitBreakerName, state);
}
}
}
private String getListenerIdFromCircuitBreakerName(String circuitBreakerName) {
//Listener-id has same value as CircuitBreaker name
return circuitBreakerName;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment