Skip to content

Instantly share code, notes, and snippets.

@trane
Created August 10, 2015 22:47
Show Gist options
  • Save trane/2c40c0262ccd19171410 to your computer and use it in GitHub Desktop.
Save trane/2c40c0262ccd19171410 to your computer and use it in GitHub Desktop.
package com.twitter.zipkin.collector
import com.twitter.conversions.time._
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.util.{Await, Future, FuturePool}
import java.util.concurrent.CountDownLatch
import org.scalatest._
/**
* Tests the BlockingItemQueue to make sure that it can store and consume elements even when adding
* more elements than what its initial capacity is
*/
class BlockingItemQueueTest extends FunSuite {
val Item = ()
val latch = new CountDownLatch(1)
def fill(queue: BlockingItemQueue[Unit, Unit], items: Int): Future[Boolean] = {
val results = (0 until items) map { _ =>
queue.add(Item) transform { e => Future.value(e) }
}
Future.collect(results).map(_.forall(_.isReturn))
}
test("Sleeps on a max queue and waits for the worker to drain") {
val timeout = 5 seconds
val stats:InMemoryStatsReceiver = new InMemoryStatsReceiver()
val queueSize = 10
val queue = new BlockingItemQueue[Unit, Unit](queueSize, 1, fallingBehindWorker, 100.millis,
100.millis, stats)
// fill the queue
assert(Await.result(fill(queue, queueSize)))
// validate that the queue is at capacity
assert(stats.counter("queueFull").apply() >= 1)
// ensure that if we add to the queue and timeout, that we fail
assert(Try { Await.result(queue.add(Item), Duration(1, TimeUnit.MILLISECOND)) }.isFailure)
// we haven't processed a item yet
assert(stats.counter("success").apply() == 0)
// allow a Future item to be added once we unleash the workers
queue.add(Item)
// unleash the workers
latch.countDown()
Await.ready(queue.close())
// ensure the queue is drained and that we have processed 11 items
assert(queue.size() == 0)
assert(stats.counter("successes").apply() == queueSize + 1)
}
def fallingBehindWorker(param: Unit): Future[Unit] = {
Future { latch.await(); param }
}
}
@jamescway
Copy link

Line: 34 queuefull doesn't get set, because the counter is not incremented unless you go over.
I tried adding queue.add to trigger the counter, but it blocks, because its a sync twitter future, not a FutureTask. It seems like it needs to get executed inside of an executor (maybe Future.unbounded..).

@jamescway
Copy link

Also on line 40, it blocks as well, not allowing execution of line 42

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment