Skip to content

Instantly share code, notes, and snippets.

@scf37
Created April 16, 2024 13:55
Show Gist options
  • Save scf37/4071839f25e197e38e4b070cfbed977b to your computer and use it in GitHub Desktop.
Save scf37/4071839f25e197e38e4b070cfbed977b to your computer and use it in GitHub Desktop.
Emulated time on Twitter Futures
import com.twitter.util.Await
import com.twitter.util.Closable
import com.twitter.util.Duration
import com.twitter.util.Future
import com.twitter.util.Promise
import com.twitter.util.Time
import scala.collection.mutable
/**
* Timer instance that emulates time flow.
* Very useful for testing asynchronous code.
*
* @param startTime start timestamp for this timer
*/
class MockTimer(startTime: Long = System.currentTimeMillis()) extends Timer {
private[this] var closed = false
private[this] var now: Long = startTime
private[this] val tasks = mutable.PriorityQueue.empty[Task]
/**
* Execute next scheduled task, advancing internal clock as needed
*
* @return true if something were executed
*/
def tick(): Boolean = synchronized {
if (tasks.isEmpty) false else {
val t = tasks.dequeue()
now = t.startAt
t.task()
true
}
}
/**
* Consume all scheduled tasks or fail if there is no end to them
*/
def tickAll(): Unit = {
val startTime = nowMillis
while (tick()) {
if (nowMillis - startTime > 1000 * 60 * 60 * 24 * 365) {
throw new RuntimeException("One year of emulated time has passed. Unable to consume all scheduled tasks.")
}
}
}
/**
* Return completed value of this future. This method behaves like Await.result with one year timeout
* except for it uses emulated time instead of real one. I.e. always comletes instantly
*
* @param a future to wait for
* @tparam T
* @return
*/
def result[T](a: Future[T]): T = {
val startTime = nowMillis
while (!a.isDefined) {
if (!tick()) {
throw new RuntimeException("This Future will never complete")
}
if (nowMillis - startTime > 1000 * 60 * 60 * 24 * 365) {
throw new RuntimeException("One year of emulated time have passed. This Future will probably never complete.")
}
}
Await.result(a)
}
/**
* Execute all tasks scheduled for interval from now to now + [delay]
* @param delay
*/
def waitFor(delay: Duration): Unit = synchronized {
val until = now + delay.inMillis
while (now < until) {
if (tasks.isEmpty) {
now = until
} else {
if (tasks.head.startAt <= until) {
tick()
} else {
now = until
}
}
}
}
/**
* Execute callback after specified delay and return execution result
*/
override def schedule[T](delay: Duration)(task: => Future[T]): Future[T] = synchronized {
val p = Promise[T]
val t: Task = Task(now + delay.inMillis, () => {
p.become(task)
}, periodic = false)
tasks += t
p
}
/**
* Execute callback periodically.
* 'period' is delay between task finish (callback future completes) and next task start
*/
override def schedule(delay: Duration, period: Duration)(task: => Future[Unit]): Closable = {
@volatile var closed = false
def loop(): Unit = if (!closed) {
schedule(period) {
if (!closed) {
task.ensure {
loop()
}
}
Future value Unit
}
}
schedule(delay) {
if (!closed) {
task.ensure {
loop()
}
}
Future value Unit
}
new Closable {
override def close(deadline: Time): Future[Unit] = {
require(!closed, "already closed")
closed = true
Future value Unit
}
}
}
/**
*
* @return relative time, in nanoseconds
*/
override def nowNano: Long = now * 1000000
override def nowMillis: Long = now
override def close(deadline: Time): Future[Unit] = synchronized {
require(!closed, "Already closed")
closed = true
Future value Unit
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment