Created
November 7, 2023 16:16
-
-
Save thomasmartin-whoz/38624f35ea7b11ccdf504545b30b74fb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.biznet.event.handler | |
import io.biznet.event.EventHandlerProperties | |
import io.biznet.event.IdentifiedApplicationEvent | |
import mu.KLogging | |
import org.springframework.context.ApplicationListener | |
import reactor.core.publisher.Flux | |
import reactor.core.publisher.Mono | |
import reactor.core.publisher.Sinks | |
import reactor.core.scheduler.Scheduler | |
import reactor.util.retry.Retry | |
import java.time.Duration | |
import java.util.concurrent.TimeoutException | |
import java.util.concurrent.atomic.AtomicLong | |
/** | |
* Base class to derive event handlers that need their events to be: | |
* - handled using parallel processing | |
* - partitioned into sequences according to a key so that the events having the same key are handled in order | |
* (and avoid concurrent modification of downstream resources) | |
* - debounced according to a key so that fast produced similar events do not flood downstream systems | |
* | |
* Overriding class may change the scheduler (default is boundedElastic with default parameters). | |
* | |
* Simply call emit event to trigger the debounced parallel partitioned processing of the handleEvent method. | |
*/ | |
abstract class DebouncingParallelHandler<E : IdentifiedApplicationEvent>( | |
val eventHandlerProperties: EventHandlerProperties, | |
private val handlerDisablingService: HandlerDisablingService, | |
private val scheduler: Scheduler, | |
) : ApplicationListener<E> { | |
private val partitionCount = AtomicLong(0L) | |
private val debounceCount = AtomicLong(0L) | |
open fun retryCount(): Long = eventHandlerProperties.defaultRetryCount | |
open fun backoffMillis(): Long = eventHandlerProperties.defaultBackoffMillis | |
open fun debounceTimeMillis(): Long = eventHandlerProperties.debounceTimeMillis | |
open fun maxAllowedLagMillis(): Long = eventHandlerProperties.maxAllowedLagMillis | |
private val sink: Sinks.Many<E> = Sinks | |
.many() | |
.multicast() | |
.onBackpressureBuffer(eventHandlerProperties.backpressureBufferSize) | |
init { | |
createFlux(sink.asFlux()).subscribe() | |
} | |
override fun onApplicationEvent(event: E) = emitEvent(event) | |
private fun createFlux(source: Flux<E>): Flux<E> { | |
logger.info { "Initializing debouncing parallel flux for ${this::class.simpleName}" } | |
val enabledAtStartup: Boolean = eventHandlerProperties.enable[this::class.simpleName] ?: true | |
if (!enabledAtStartup) { | |
logger.warn { "${this::class.simpleName} handler is disabled by configuration !" } | |
} | |
return partition(source) | |
.parallel() | |
.runOn(scheduler) | |
.flatMap { partitionedFlux -> | |
debounce(partitionedFlux) | |
.publishOn(scheduler) | |
.flatMap { event -> | |
if (enabledAtStartup && handlerDisablingService.isHandlerEnabled(this::class.simpleName ?: "")) { | |
handleEventWithRetry(event) | |
} else { | |
logger.debug { "${this::class.simpleName} is disabled !" } | |
Mono.just(event) | |
} | |
} | |
} | |
.sequential() | |
.onErrorContinue { throwable, _ -> | |
// timeouts are normal, this is how we cancel partitions | |
if (throwable !is TimeoutException) { | |
logger.error("Unexpected error while publishing event to sink", throwable) | |
} | |
} | |
} | |
private fun handleEventWithRetry(event: E): Mono<E> = Mono.defer { | |
val startTimestamp = System.currentTimeMillis() | |
val lag = startTimestamp - event.timestamp | |
logger.info("Processing event ${event.logIdentifier} - lag $lag ms") | |
if (lag > maxAllowedLagMillis()) { | |
logger.warn { "Handler is lagging - lag $lag ms" } | |
} | |
Mono.just(handleEvent(event)) | |
.doOnSuccess { | |
val handlerTime = System.currentTimeMillis() - startTimestamp | |
logger.debug { "Processed event ${event.logIdentifier} - time $handlerTime ms" } | |
if (handlerTime > debounceTimeMillis()) { | |
logger.warn { "Handler is too slow - time $handlerTime ms" } | |
} | |
} | |
}.retryWhen( | |
Retry.backoff(retryCount(), Duration.ofMillis(backoffMillis())) | |
.doBeforeRetry { retrySignal -> | |
logger.warn("Retrying: ${retrySignal.totalRetries()}; ${retrySignal.totalRetriesInARow()};", retrySignal.failure()) | |
} | |
).then(Mono.just(event)) | |
fun emitEvent(event: E) { | |
try { | |
sink.tryEmitNext(event).also { emitResult -> | |
if (emitResult.isSuccess) { | |
logger.debug("Emitted event ${event.logIdentifier}") | |
} else { | |
logger.warn("Processing event ${event.logIdentifier} failed with status ${emitResult.name}") | |
} | |
} | |
} catch (e: Exception) { | |
logger.error("Error while publishing to sink ${event.logIdentifier}", e) | |
} | |
} | |
private fun debounce(eventFlux: Flux<E>): Flux<E> = | |
eventFlux | |
.groupBy(::debounceKey) | |
.doOnNext { flux -> | |
val debounceCountValue = debounceCount.incrementAndGet() | |
logger.debug { "Created debounce flux for ${flux.key()} - total count $debounceCountValue" } | |
} | |
.flatMap { flux -> | |
flux | |
.sampleTimeout { Mono.delay(Duration.ofMillis(eventHandlerProperties.debounceTimeMillis)) } | |
.doAfterTerminate { | |
val debounceCountValue = debounceCount.decrementAndGet() | |
logger.debug { "Terminated debounce flux for ${flux.key()} - total count $debounceCountValue" } | |
} | |
} | |
private fun partition(eventFlux: Flux<E>): Flux<Flux<E>> = | |
eventFlux | |
.groupBy(::partitionKey) | |
.doOnNext { flux -> | |
val partitionCountValue = partitionCount.incrementAndGet() | |
logger.debug { "Created partition flux for ${flux.key()} - total count $partitionCountValue" } | |
} | |
.map { flux -> | |
flux | |
.timeout(Duration.ofMillis(eventHandlerProperties.partitionTtlFactor * eventHandlerProperties.debounceTimeMillis)) | |
.doAfterTerminate { | |
val partitionCountValue = partitionCount.decrementAndGet() | |
logger.debug { "Terminated partition flux for ${flux.key()} - total count $partitionCountValue" } | |
} | |
} | |
abstract fun debounceKey(event: E): String | |
abstract fun partitionKey(event: E): String | |
abstract fun handleEvent(event: E) | |
companion object : KLogging() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment