Skip to content

Instantly share code, notes, and snippets.

@thomasmartin-whoz
Created November 7, 2023 16:16
Show Gist options
  • Save thomasmartin-whoz/38624f35ea7b11ccdf504545b30b74fb to your computer and use it in GitHub Desktop.
Save thomasmartin-whoz/38624f35ea7b11ccdf504545b30b74fb to your computer and use it in GitHub Desktop.
package io.biznet.event.handler
import io.biznet.event.EventHandlerProperties
import io.biznet.event.IdentifiedApplicationEvent
import mu.KLogging
import org.springframework.context.ApplicationListener
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import reactor.util.retry.Retry
import java.time.Duration
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicLong
/**
* Base class to derive event handlers that need their events to be:
* - handled using parallel processing
* - partitioned into sequences according to a key so that the events having the same key are handled in order
* (and avoid concurrent modification of downstream resources)
* - debounced according to a key so that fast produced similar events do not flood downstream systems
*
* Overriding class may change the scheduler (default is boundedElastic with default parameters).
*
* Simply call emit event to trigger the debounced parallel partitioned processing of the handleEvent method.
*/
abstract class DebouncingParallelHandler<E : IdentifiedApplicationEvent>(
val eventHandlerProperties: EventHandlerProperties,
private val handlerDisablingService: HandlerDisablingService,
private val scheduler: Scheduler,
) : ApplicationListener<E> {
private val partitionCount = AtomicLong(0L)
private val debounceCount = AtomicLong(0L)
open fun retryCount(): Long = eventHandlerProperties.defaultRetryCount
open fun backoffMillis(): Long = eventHandlerProperties.defaultBackoffMillis
open fun debounceTimeMillis(): Long = eventHandlerProperties.debounceTimeMillis
open fun maxAllowedLagMillis(): Long = eventHandlerProperties.maxAllowedLagMillis
private val sink: Sinks.Many<E> = Sinks
.many()
.multicast()
.onBackpressureBuffer(eventHandlerProperties.backpressureBufferSize)
init {
createFlux(sink.asFlux()).subscribe()
}
override fun onApplicationEvent(event: E) = emitEvent(event)
private fun createFlux(source: Flux<E>): Flux<E> {
logger.info { "Initializing debouncing parallel flux for ${this::class.simpleName}" }
val enabledAtStartup: Boolean = eventHandlerProperties.enable[this::class.simpleName] ?: true
if (!enabledAtStartup) {
logger.warn { "${this::class.simpleName} handler is disabled by configuration !" }
}
return partition(source)
.parallel()
.runOn(scheduler)
.flatMap { partitionedFlux ->
debounce(partitionedFlux)
.publishOn(scheduler)
.flatMap { event ->
if (enabledAtStartup && handlerDisablingService.isHandlerEnabled(this::class.simpleName ?: "")) {
handleEventWithRetry(event)
} else {
logger.debug { "${this::class.simpleName} is disabled !" }
Mono.just(event)
}
}
}
.sequential()
.onErrorContinue { throwable, _ ->
// timeouts are normal, this is how we cancel partitions
if (throwable !is TimeoutException) {
logger.error("Unexpected error while publishing event to sink", throwable)
}
}
}
private fun handleEventWithRetry(event: E): Mono<E> = Mono.defer {
val startTimestamp = System.currentTimeMillis()
val lag = startTimestamp - event.timestamp
logger.info("Processing event ${event.logIdentifier} - lag $lag ms")
if (lag > maxAllowedLagMillis()) {
logger.warn { "Handler is lagging - lag $lag ms" }
}
Mono.just(handleEvent(event))
.doOnSuccess {
val handlerTime = System.currentTimeMillis() - startTimestamp
logger.debug { "Processed event ${event.logIdentifier} - time $handlerTime ms" }
if (handlerTime > debounceTimeMillis()) {
logger.warn { "Handler is too slow - time $handlerTime ms" }
}
}
}.retryWhen(
Retry.backoff(retryCount(), Duration.ofMillis(backoffMillis()))
.doBeforeRetry { retrySignal ->
logger.warn("Retrying: ${retrySignal.totalRetries()}; ${retrySignal.totalRetriesInARow()};", retrySignal.failure())
}
).then(Mono.just(event))
fun emitEvent(event: E) {
try {
sink.tryEmitNext(event).also { emitResult ->
if (emitResult.isSuccess) {
logger.debug("Emitted event ${event.logIdentifier}")
} else {
logger.warn("Processing event ${event.logIdentifier} failed with status ${emitResult.name}")
}
}
} catch (e: Exception) {
logger.error("Error while publishing to sink ${event.logIdentifier}", e)
}
}
private fun debounce(eventFlux: Flux<E>): Flux<E> =
eventFlux
.groupBy(::debounceKey)
.doOnNext { flux ->
val debounceCountValue = debounceCount.incrementAndGet()
logger.debug { "Created debounce flux for ${flux.key()} - total count $debounceCountValue" }
}
.flatMap { flux ->
flux
.sampleTimeout { Mono.delay(Duration.ofMillis(eventHandlerProperties.debounceTimeMillis)) }
.doAfterTerminate {
val debounceCountValue = debounceCount.decrementAndGet()
logger.debug { "Terminated debounce flux for ${flux.key()} - total count $debounceCountValue" }
}
}
private fun partition(eventFlux: Flux<E>): Flux<Flux<E>> =
eventFlux
.groupBy(::partitionKey)
.doOnNext { flux ->
val partitionCountValue = partitionCount.incrementAndGet()
logger.debug { "Created partition flux for ${flux.key()} - total count $partitionCountValue" }
}
.map { flux ->
flux
.timeout(Duration.ofMillis(eventHandlerProperties.partitionTtlFactor * eventHandlerProperties.debounceTimeMillis))
.doAfterTerminate {
val partitionCountValue = partitionCount.decrementAndGet()
logger.debug { "Terminated partition flux for ${flux.key()} - total count $partitionCountValue" }
}
}
abstract fun debounceKey(event: E): String
abstract fun partitionKey(event: E): String
abstract fun handleEvent(event: E)
companion object : KLogging()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment