Skip to content

Instantly share code, notes, and snippets.

@samhendley
Created August 24, 2010 13:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samhendley/547596 to your computer and use it in GitHub Desktop.
Save samhendley/547596 to your computer and use it in GitHub Desktop.
Forgive me if this issue has been addressed before but I wasn’t able to unearth anything describing this issue on the web or on this list.
I believe there is a bug in the MailBox implementation. If a receiveWithin block timesout the next message is lost. It appears that the lost message is sent to the previous receiveWithin block, though code inside that block isn’t run either. I have a test case that proves this behavior (http://gist.github.com/547596). Is there something I am missing and this is intentional? If so the documentation needs to be updated to warn about this behavior.
In either case I am looking for a replacement to mailbox. I only stumbled onto this behavior because there was no way to peek into a MailBox and see if there is a message waiting. I was simulating that behavior by doing the shortest possible recieveWithin to check for already existing messages and then doing a longer wait once I had consumed all of the available messages and still hadn’t received the message I wanted.
Thanks for your time.
Sam
package org.psi
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import scala.concurrent.{ MailBox, TIMEOUT }
/// This test shows that a receiveWithin that timesout causes the next message to be
/// lost and I believe it also leaks the closure but I haven't been able to confirm
/// that.
/// http://lampsvn.epfl.ch/trac/scala/browser/scala/tags/R_2_8_0_final/src/library/scala/concurrent/MailBox.scala
/// The source code seems to indicate the problem is the receiveWithin closure is not
/// removed from the list of applicable recievers when the timeout expires.
@RunWith(classOf[JUnitRunner])
class MailBoxTests extends FunSuite {
case class Test(i: Int)
/// Checks that the next message in the mail box matches what we expect
def checkRecieved(mail: MailBox, value: Int): Boolean = {
mail.receiveWithin(1){
case Test(x) => return (x == value)
case TIMEOUT => return false
}
return false
}
/// this block can only receive the timeout (not the message type!) this works
/// as expected since it doesn't register as consuming the message of interest
def checkTimeoutCantRecieve(mail: MailBox): Boolean = {
mail.receiveWithin(1){
case TIMEOUT => return true
}
return false
}
/// this block can receive either a timeout or the message type (which is the
/// way we would expect to see it used in real code). I have added a println
/// which should show if this closure is still "alive"
def checkTimeoutCanReceive(mail: MailBox): Boolean = {
mail.receiveWithin(1){
case Test(x) => {
println("unexpected test value: " + x)
return false
}
case TIMEOUT => return true
}
return false
}
test("Works with no timeouts"){
val mail = new MailBox
mail.send(new Test(1))
mail.send(new Test(2))
assert(checkRecieved(mail, 1))
assert(checkRecieved(mail, 2))
mail.send(new Test(3))
assert(checkRecieved(mail, 3))
}
test("Passes with timeout that cant recieve"){
val mail = new MailBox
mail.send(new Test(1))
assert(checkRecieved(mail, 1))
assert(checkTimeoutCantRecieve(mail))
mail.send(new Test(2))
assert(checkRecieved(mail, 2))
}
test("Fails because timeout eats message"){
val mail = new MailBox
assert(checkTimeoutCanReceive(mail))
// fails because the timeout test eats the message, should see message on stdout
mail.send(new Test(2))
assert(checkRecieved(mail, 2))
}
test("Passes with sacrificial message"){
val mail = new MailBox
assert(checkTimeoutCanReceive(mail))
// sending an extra message that is sent to the timeout closure fixes the issue
mail.send(new Test(99))
mail.send(new Test(2))
assert(checkRecieved(mail, 2))
}
}
package org.psi
import scala.concurrent.MailBox
/**
* A class that allow us to test multi-threaded code without reproducing the same
* mailbox + recieveWithin machinery
*/
class SyncVar[T <: Any](intialValue: T) {
case class StateChange(value: T)
var current: T = intialValue
val mail = new MailBox
def update(value: T) {
mail.send(StateChange(value))
}
def waitUntil(value: T, msec: Long = 5000): Boolean = {
waitFor(current => current == value, msec)
}
def waitWhile(value: T, msec: Long = 5000): Boolean = {
waitFor(current => current != value, msec)
}
def waitFor(fun: T => Boolean, msec: Long = 5000): Boolean = {
val end = System.currentTimeMillis + msec
while (true) {
var tilExpiry = end - System.currentTimeMillis
if (tilExpiry <= 0) return false
mail.receiveWithin(tilExpiry) {
case StateChange(x) =>
current = x
if (fun(current)) return true
}
}
return false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment