Skip to content

Instantly share code, notes, and snippets.

@arindamxd
Created October 20, 2023 19:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arindamxd/d72746493e384d54852b4460ce317d13 to your computer and use it in GitHub Desktop.
Save arindamxd/d72746493e384d54852b4460ce317d13 to your computer and use it in GitHub Desktop.
TaskScheduler: A backgound task scheduler with a delay and override feature.
package com.arindam.scheduler
import android.os.Handler
import android.os.Looper
import android.util.Log
import java.util.concurrent.CancellationException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
/**
* A helper class that allows to schedule/queue Runnables on a single threaded background queue.
*
* Created by Arindam Karmakar on 20/10/23.
*/
object TaskScheduler {
private val tag = TaskScheduler::class.java.name
/**
* Represents a Task scheduled to be run in the future on an [TaskScheduler].
* It is created via createAndScheduleDelayedTask().
* Supports cancellation (via cancel()) and early execution (via skipDelay()).
*/
class DelayedTask(private val task: Runnable) {
// The ScheduledFuture returned by executor.schedule(). It is set to null after the task has
// been run or canceled.
private var scheduledFuture: ScheduledFuture<*>? = null
/**
* Schedules the DelayedTask. This is called immediately after construction by
* createAndScheduleDelayedTask().
*/
internal fun start(delayMs: Long) {
scheduledFuture = executor.schedule(this::handleDelayElapsed, delayMs)
}
/**
* Cancels the task if it hasn't already been executed or canceled.
*
* As long as the task has not yet been run, calling cancel() (from a task already running on
* the [TaskScheduler] provides a guarantee that the task will not be run.
*/
internal fun cancel() {
if (scheduledFuture != null) {
// NOTE: We don't rely on this cancel() succeeding since handleDelayElapsed() will become
// a no-op anyway (since markDone() sets scheduledFuture to null).
scheduledFuture?.cancel( /* mayInterruptIfRunning = */false)
markDone()
}
}
private fun handleDelayElapsed() {
verifyIsCurrentThread()
if (scheduledFuture != null) {
markDone()
task.run()
}
}
/** Marks this delayed task as done, notifying the [TaskScheduler] that it should be removed. */
private fun markDone() {
if (scheduledFuture == null) Log.e(tag, "Caller should have verified scheduledFuture is non-null.")
scheduledFuture = null
removeDelayedTask(this)
}
}
/**
* A wrapper around a [ScheduledThreadPoolExecutor] class that provides:
*
* 1. Synchronized task scheduling. This is different from function 3, which is about task
* execution in a single thread.
* 1. Ability to do soft-shutdown: only critical tasks related to shutting [TaskScheduler] down
* can be executed once the shutdown process initiated.
* 1. Single threaded execution service, no concurrent execution among the `Runnable`s
* scheduled in this Executor.
*/
private class SynchronizedShutdownAwareExecutor : Executor {
/**
* The single threaded executor that is backing this Executor. This is also the executor used
* when some tasks explicitly request to run after shutdown has been initiated.
*/
private val internalExecutor: ScheduledThreadPoolExecutor
/** Synchronized access to isShuttingDown */
/** Whether the shutdown process has initiated, once it is started, it is not revertable. */
@get:Synchronized
private val isShuttingDown: Boolean
/**
* The single thread that will be used by the executor. This is created early and managed
* directly so that it's possible later to make assertions about executing on the correct
* thread.
*/
val thread: Thread
/** A ThreadFactory for a single, pre-created thread. */
private inner class DelayedStartFactory : Runnable, ThreadFactory {
private val latch = CountDownLatch(1)
private var delegate: Runnable? = null
override fun run() {
try {
latch.await()
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
delegate?.run()
}
override fun newThread(runnable: Runnable): Thread {
if (delegate != null) Log.e(tag, "Only one thread may be created in an $tag.")
delegate = runnable
latch.countDown()
return thread
}
}
init {
val threadFactory = DelayedStartFactory()
thread = Executors.defaultThreadFactory().newThread(threadFactory)
thread.name = "TokenRefreshWorker"
thread.isDaemon = true
thread.uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { _, throwable ->
panic(throwable)
}
internalExecutor = object : ScheduledThreadPoolExecutor(1, threadFactory) {
override fun afterExecute(r: Runnable?, t: Throwable?) {
var throwable: Throwable? = t
super.afterExecute(r, t)
if (throwable == null && r is Future<*>) {
val future = r as Future<*>
try {
// Not all Futures will be done, e.g. when used with scheduledAtFixedRate
if (future.isDone) future.get()
} catch (ce: CancellationException) {
// Cancellation exceptions are okay, we expect them to happen sometimes
} catch (ee: ExecutionException) {
throwable = ee.cause
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
}
throwable?.let { panic(it) }
}
}
// Core threads don't time out, this only takes effect when we drop the
// number of required core threads
internalExecutor.setKeepAliveTime(3, TimeUnit.SECONDS)
isShuttingDown = false
}
/**
* Check if shutdown is initiated before scheduling. If it is initiated, the command will not be
* executed.
*/
@Synchronized
override fun execute(command: Runnable) {
if (!isShuttingDown) internalExecutor.execute(command)
}
/**
* Wraps [ScheduledThreadPoolExecutor.schedule] and provides
* shutdown state check: the command will not be scheduled if the shutdown has been initiated.
*/
@Synchronized
fun schedule(command: Runnable, delay: Long): ScheduledFuture<*>? {
return if (!isShuttingDown) internalExecutor.schedule(command, delay, TimeUnit.MILLISECONDS) else null
}
/** Wraps around [ScheduledThreadPoolExecutor.shutdownNow]. */
fun shutdownNow(): MutableList<Runnable> = internalExecutor.shutdownNow()
/** Wraps around [ScheduledThreadPoolExecutor.setCorePoolSize]. */
fun setCorePoolSize(size: Int) {
internalExecutor.corePoolSize = size
}
}
/** The executor backing this [TaskScheduler]. */
private val executor: SynchronizedShutdownAwareExecutor = SynchronizedShutdownAwareExecutor()
// Tasks scheduled to be queued in the future. Tasks are automatically removed after they are run
// or canceled.
// NOTE: We disallow duplicates currently, so this could be a Set<> which might have better
// theoretical removal speed, except this list will always be small so ArrayList is fine.
private val delayedTasks: ArrayList<DelayedTask> = ArrayList()
/**
* Schedule a task after the specified delay.
* The returned DelayedTask can be used to cancel the task prior to its running.
*
* @param override Whether to replace existing scheduled tasks.
* @param delayMs The delay after which the task will run.
* @param task The task to run
* @return A DelayedTask instance that can be used for cancellation.
*/
fun scheduleTask(override: Boolean = true, delayMs: Long, task: Runnable): DelayedTask {
if (override && isTaskScheduled()) cancelTask()
val delayedTask = scheduleDelayedTask(delayMs, task)
delayedTasks.add(delayedTask)
return delayedTask
}
/** Determines if a delayed task with a particular timerId exists. */
internal fun isTaskScheduled(): Boolean = delayedTasks.isNotEmpty()
/** Cancels the running tasks. */
internal fun cancelTask(): Boolean {
if (delayedTasks.isEmpty()) return false
for (task in delayedTasks) {
task.cancel()
delayedTasks.remove(task)
}
return true
}
/**
* Shuts down the [TaskScheduler] and releases resources after which no progress will ever be made
* again.
*/
internal fun shutdown() {
// Will cause the executor to de-reference all threads, the best we can do
executor.setCorePoolSize(0)
}
/**
* Creates and returns a DelayedTask that has been scheduled to be executed on the provided queue
* after the provided delayMs.
*
* @param delayMs The delay (ms) before the operation should be scheduled.
* @param task The task to run.
*/
private fun scheduleDelayedTask(
delayMs: Long,
task: Runnable
): DelayedTask = DelayedTask(task).apply { start(delayMs) }
/** Called by DelayedTask to remove itself from our list of pending delayed tasks. */
private fun removeDelayedTask(task: DelayedTask) {
val found = delayedTasks.remove(task)
if (!found) Log.e(tag, "Delayed task not found.")
}
/** Verifies that the current thread is the managed [TaskScheduler] thread. */
private fun verifyIsCurrentThread() {
val current = Thread.currentThread()
if (executor.thread !== current) throw RuntimeException(String.format(
"We are running on the wrong thread. Expected to be on the %s thread %s/%d but was %s/%d",
tag, executor.thread.name, executor.thread.id, current.name, current.id
))
}
/**
* Immediately stops running any scheduled tasks and causes a "panic" (through crashing the app).
* Should only be used for unrecoverable exceptions.
*
* @param t The Throwable that is caused the panic.
*/
// TODO: Check If this is required, if not, handle it properly
private fun panic(t: Throwable?) {
executor.shutdownNow()
val handler = Handler(Looper.getMainLooper())
handler.post {
if (t is OutOfMemoryError) {
// OOMs can happen if developers try to load too much data at once. Instead of treating
// this as an internal error, give a hint that this might be due to excessive queries
// in TaskScheduler.
val error = OutOfMemoryError("$tag ran out of memory. Check your queries to make sure they are not loading an excessive amount of data.")
error.initCause(t)
throw error
} else {
throw RuntimeException("Internal error in $tag.", t)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment