Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import org.slf4j.LoggerFactory
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.ResponseStatus
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import reactor.core.publisher.Mono
import reactor.core.publisher.MonoSink
import reactor.core.scheduler.Scheduler
import java.util.concurrent.Callable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
class TenantTaskCoordinator(private val scheduler: Scheduler,
val maximumConcurrency: Int = 1,
val maximumQueueSize: Int = 50,
val name: String = "") : AutoCloseable {
private val maximumWorkInProgress = maximumQueueSize + maximumConcurrency
private val maxBufferSize = maximumWorkInProgress * 2
val currentWorkInProgressCounter = AtomicInteger()
private lateinit var taskSink: FluxSink<Task>
private val taskSource = Flux.create<Task>({ taskSink = it }, FluxSink.OverflowStrategy.BUFFER)
private data class Task(val name: String,
private val _work: Mono<Any>,
val outsideSink: MonoSink<Any>,
@field:Volatile var isCancelled: Boolean = false) {
lateinit var outsideTimeoutSink: MonoSink<Task>
val work: Mono<Any> get() = if (isCancelled) Mono.empty() else _work
val outsideTimeout = Mono.create<Task> { outsideTimeoutSink = it }
fun outsideCancel() {
LOG.trace("Task.outsideCancel {} {}", System.identityHashCode(this), this)
isCancelled = true
outsideTimeoutSink.success(this)
}
fun onError(error: Throwable) {
LOG.warn("Task.onError {}", this, error)
outsideSink.error(error)
}
fun onSuccess(result: Any?) {
try {
outsideSink.success(result)
} catch (ex: Exception) {
LOG.warn("failed in onSuccess {} result={} error", this, result, ex)
}
}
}
private val processSinkOnErrorResume = processSinkWithLimitedConcurrency()
.onErrorResume { error: Throwable? ->
LOG.warn("name={} Error processing sink with limited concurrency", name, error)
processSinkWithLimitedConcurrency()
}
private fun processSinkWithLimitedConcurrency(): Flux<Any> {
return taskSource
.filter { !it.isCancelled }
.flatMap({ job ->
job.work
.doOnError(job::onError)
.doOnSuccess(job::onSuccess)
.subscribeOn(scheduler)
.timeout(job.outsideTimeout)
.onErrorReturn(job)
}, maximumConcurrency, maxBufferSize)
}
private val disposer = processSinkOnErrorResume.subscribe()
@Suppress("UNCHECKED_CAST")
fun <T : Any> execute(job: Callable<T>): Mono<T> {
return Mono.create({ outsideSink ->
val _workInProgressWasDecremented = AtomicBoolean(false)
fun decrementOnce() {
if (_workInProgressWasDecremented.compareAndSet(false, true)) {
currentWorkInProgressCounter.decrementAndGet()
}
}
val workInProgress = currentWorkInProgressCounter.incrementAndGet()
if (workInProgress > maximumWorkInProgress) {
outsideSink.error(TooManyTasks("Current work in progress $workInProgress exceeds $maximumWorkInProgress jobs in $name"))
decrementOnce()
} else {
val singleJob = Mono.fromCallable(job).doAfterTerminate {
decrementOnce()
}
val delayedTask = Task(name, singleJob as Mono<Any>, outsideSink as MonoSink<Any>)
outsideSink.onCancel {
delayedTask.outsideCancel()
decrementOnce()
}
LOG.trace("sink.next({} {})", System.identityHashCode(delayedTask), delayedTask)
taskSink.next(delayedTask)
}
})
}
override fun close() {
scheduler.dispose()
disposer.dispose()
}
override fun toString(): String {
return "TenantTaskCoordinator(scheduler=$scheduler, maximumConcurrency=$maximumConcurrency, maximumQueueSize=$maximumQueueSize, name='$name')"
}
companion object {
private val LOG = LoggerFactory.getLogger(TenantTaskCoordinator::class.java)
}
@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS)
class TooManyTasks(message: String) : RuntimeException(message)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment