import org.slf4j.LoggerFactory | |
import org.springframework.http.HttpStatus | |
import org.springframework.web.bind.annotation.ResponseStatus | |
import reactor.core.publisher.Flux | |
import reactor.core.publisher.FluxSink | |
import reactor.core.publisher.Mono | |
import reactor.core.publisher.MonoSink | |
import reactor.core.scheduler.Scheduler | |
import java.util.concurrent.Callable | |
import java.util.concurrent.atomic.AtomicBoolean | |
import java.util.concurrent.atomic.AtomicInteger | |
class TenantTaskCoordinator(private val scheduler: Scheduler, | |
val maximumConcurrency: Int = 1, | |
val maximumQueueSize: Int = 50, | |
val name: String = "") : AutoCloseable { | |
private val maximumWorkInProgress = maximumQueueSize + maximumConcurrency | |
private val maxBufferSize = maximumWorkInProgress * 2 | |
val currentWorkInProgressCounter = AtomicInteger() | |
private lateinit var taskSink: FluxSink<Task> | |
private val taskSource = Flux.create<Task>({ taskSink = it }, FluxSink.OverflowStrategy.BUFFER) | |
private data class Task(val name: String, | |
private val _work: Mono<Any>, | |
val outsideSink: MonoSink<Any>, | |
@field:Volatile var isCancelled: Boolean = false) { | |
lateinit var outsideTimeoutSink: MonoSink<Task> | |
val work: Mono<Any> get() = if (isCancelled) Mono.empty() else _work | |
val outsideTimeout = Mono.create<Task> { outsideTimeoutSink = it } | |
fun outsideCancel() { | |
LOG.trace("Task.outsideCancel {} {}", System.identityHashCode(this), this) | |
isCancelled = true | |
outsideTimeoutSink.success(this) | |
} | |
fun onError(error: Throwable) { | |
LOG.warn("Task.onError {}", this, error) | |
outsideSink.error(error) | |
} | |
fun onSuccess(result: Any?) { | |
try { | |
outsideSink.success(result) | |
} catch (ex: Exception) { | |
LOG.warn("failed in onSuccess {} result={} error", this, result, ex) | |
} | |
} | |
} | |
private val processSinkOnErrorResume = processSinkWithLimitedConcurrency() | |
.onErrorResume { error: Throwable? -> | |
LOG.warn("name={} Error processing sink with limited concurrency", name, error) | |
processSinkWithLimitedConcurrency() | |
} | |
private fun processSinkWithLimitedConcurrency(): Flux<Any> { | |
return taskSource | |
.filter { !it.isCancelled } | |
.flatMap({ job -> | |
job.work | |
.doOnError(job::onError) | |
.doOnSuccess(job::onSuccess) | |
.subscribeOn(scheduler) | |
.timeout(job.outsideTimeout) | |
.onErrorReturn(job) | |
}, maximumConcurrency, maxBufferSize) | |
} | |
private val disposer = processSinkOnErrorResume.subscribe() | |
@Suppress("UNCHECKED_CAST") | |
fun <T : Any> execute(job: Callable<T>): Mono<T> { | |
return Mono.create({ outsideSink -> | |
val _workInProgressWasDecremented = AtomicBoolean(false) | |
fun decrementOnce() { | |
if (_workInProgressWasDecremented.compareAndSet(false, true)) { | |
currentWorkInProgressCounter.decrementAndGet() | |
} | |
} | |
val workInProgress = currentWorkInProgressCounter.incrementAndGet() | |
if (workInProgress > maximumWorkInProgress) { | |
outsideSink.error(TooManyTasks("Current work in progress $workInProgress exceeds $maximumWorkInProgress jobs in $name")) | |
decrementOnce() | |
} else { | |
val singleJob = Mono.fromCallable(job).doAfterTerminate { | |
decrementOnce() | |
} | |
val delayedTask = Task(name, singleJob as Mono<Any>, outsideSink as MonoSink<Any>) | |
outsideSink.onCancel { | |
delayedTask.outsideCancel() | |
decrementOnce() | |
} | |
LOG.trace("sink.next({} {})", System.identityHashCode(delayedTask), delayedTask) | |
taskSink.next(delayedTask) | |
} | |
}) | |
} | |
override fun close() { | |
scheduler.dispose() | |
disposer.dispose() | |
} | |
override fun toString(): String { | |
return "TenantTaskCoordinator(scheduler=$scheduler, maximumConcurrency=$maximumConcurrency, maximumQueueSize=$maximumQueueSize, name='$name')" | |
} | |
companion object { | |
private val LOG = LoggerFactory.getLogger(TenantTaskCoordinator::class.java) | |
} | |
@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS) | |
class TooManyTasks(message: String) : RuntimeException(message) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment