|
package dfrommi.infrastructure.messaging; |
|
|
|
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.CLOSED; |
|
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.FORCED_OPEN; |
|
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.HALF_OPEN; |
|
import static io.github.resilience4j.circuitbreaker.CircuitBreaker.State.OPEN; |
|
|
|
import io.github.resilience4j.circuitbreaker.CircuitBreaker; |
|
import io.github.resilience4j.core.registry.EntryAddedEvent; |
|
import io.github.resilience4j.core.registry.EntryRemovedEvent; |
|
import io.github.resilience4j.core.registry.EntryReplacedEvent; |
|
import io.github.resilience4j.core.registry.RegistryEventConsumer; |
|
import java.util.HashSet; |
|
import java.util.Set; |
|
import lombok.RequiredArgsConstructor; |
|
import lombok.extern.slf4j.Slf4j; |
|
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; |
|
import org.springframework.scheduling.annotation.Scheduled; |
|
import org.springframework.stereotype.Component; |
|
|
|
@Slf4j |
|
@RequiredArgsConstructor |
|
@Component |
|
public class RabbitCircuitBreakerIntegration implements RegistryEventConsumer<CircuitBreaker> { |
|
private final RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; |
|
private Set<CircuitBreaker> circuitBreakersToWatch = new HashSet<>(); |
|
|
|
@Override |
|
public void onEntryAddedEvent(EntryAddedEvent<CircuitBreaker> entryAddedEvent) { |
|
var cb = entryAddedEvent.getAddedEntry(); |
|
|
|
cb.getEventPublisher().onStateTransition(transition -> |
|
synchronizeCircuitBreakerWithListener(transition.getCircuitBreakerName(), transition.getStateTransition().getToState()) |
|
); |
|
|
|
circuitBreakersToWatch.add(cb); |
|
} |
|
|
|
@Override |
|
public void onEntryRemovedEvent(EntryRemovedEvent<CircuitBreaker> entryRemoveEvent) { |
|
circuitBreakersToWatch.remove(entryRemoveEvent.getRemovedEntry()); |
|
} |
|
|
|
@Override |
|
public void onEntryReplacedEvent(EntryReplacedEvent<CircuitBreaker> entryReplacedEvent) { |
|
//same as delete and add |
|
} |
|
|
|
//Just in case anything goes wrong in the CircuitBreaker transition handler, then the listener will not remain stopped forever |
|
@Scheduled(fixedRateString = "PT2M") |
|
public void listenerRunningWatchdog() { |
|
circuitBreakersToWatch.forEach(cb -> |
|
synchronizeCircuitBreakerWithListener(cb.getName(), cb.getState()) |
|
); |
|
} |
|
|
|
protected void synchronizeCircuitBreakerWithListener(String circuitBreakerName, CircuitBreaker.State state) { |
|
//Listener not available when circuit-breaker is added |
|
String listenerId = getListenerIdFromCircuitBreakerName(circuitBreakerName); |
|
var listener = rabbitListenerEndpointRegistry.getListenerContainer(listenerId); |
|
if (listener == null) { |
|
return; |
|
} |
|
|
|
if ((state == CLOSED || state == HALF_OPEN) && !listener.isRunning()) { |
|
log.info("Starting RabbitMQ listener {} because state of circuit-breaker {} is {}", listenerId, circuitBreakerName, state); |
|
try { |
|
listener.start(); |
|
} catch (Exception e) { |
|
log.error("Error starting RabbitMQ listener {} after state of circuit-breaker {} is {}", listenerId, circuitBreakerName, state); |
|
} |
|
} |
|
|
|
if ((state == OPEN || state == FORCED_OPEN) && listener.isRunning()) { |
|
log.info("Stopping RabbitMQ listener {} because state of circuit-breaker {} is {}", listenerId, circuitBreakerName, state); |
|
try { |
|
listener.stop(); |
|
} catch (Exception e) { |
|
log.error("Error stopping RabbitMQ listener {} after state of circuit-breaker {} is {}", listenerId, circuitBreakerName, state); |
|
} |
|
} |
|
} |
|
|
|
private String getListenerIdFromCircuitBreakerName(String circuitBreakerName) { |
|
//Listener-id has same value as CircuitBreaker name |
|
return circuitBreakerName; |
|
} |
|
} |