Created
July 13, 2011 05:07
-
-
Save purefn/1079751 to your computer and use it in GitHub Desktop.
Start of port of Haskell MVar to Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz | |
package concurrent | |
import Scalaz._ | |
import Promise._ | |
import Strategy._ | |
import java.util.concurrent._ | |
import locks._ | |
import atomic._ | |
sealed trait MVar[A] { | |
def put(a: Promise[A]): Promise[Unit] | |
def take: Promise[A] | |
// def tryTake: Promise[Option[A]] | |
// def tryPut(a: Promise[A]): Promise[Boolean] | |
// | |
// def peek: Promise[A] | |
// def peekOrPut(a: => A): Promise[A] | |
// | |
// def modify(a: A => A): Promise[A] | |
// def replace(a: A): Promise[A] = modify(_ => a) | |
} | |
object MVar { | |
def apply[A]: Promise[MVar[A]] = promise(new MVar[A] { | |
val value: AtomicReference[Option[Promise[A]]] = new AtomicReference(None) | |
val readLatch = BooleanLatch() | |
val writeLatch = BooleanLatch() | |
def take = read(None, { | |
val a = value.getAndSet(None) | |
writeLatch.release() | |
a | |
}) | |
def put(a: Promise[A]) = promise(write(a, value.get))(Sequential) | |
// def tryTake = | |
// value.get.map(_.map(some(_))).getOrElse(promise(none[A])(Sequential)) | |
// | |
// def tryPut(a: Promise[A]) = value.get.map(_ => false) | |
def read(init: Option[Promise[A]], reader: => Option[Promise[A]]) = { | |
def read_(r: => Option[Promise[A]]): Promise[A] = | |
r getOrElse { | |
// we don't have a value so we wait for someone to put one | |
// ??? If a thread puts a value here before we start the wait, we'll enter the wait when we shouldn't. | |
// This thread will block when we have a value we can use, until someone else takes it and a new value is put. | |
// Is there a way to fix this? | |
readLatch.await() | |
// someone has put a value so now we try to read it | |
read_(reader) | |
} | |
read_(init) | |
} | |
def write(a: Promise[A], r: => Option[Promise[A]]): Unit = | |
r map { _ => | |
// if there is a value, wait until someone takes it | |
// ??? If 1 thread completes a take here before we start the wait, we'll enter the wait when we shouldn't. | |
// This thread will block until another thread writes a value and it is taken. | |
// Is there a way to fix this? | |
writeLatch.await() | |
// someone has taken the value, try and write it again | |
write(a, r) | |
} getOrElse { | |
// There is no value, so it's time to try and write one. | |
// If the value has changed, the write will fail so we'll need to try it again. | |
if (!value.compareAndSet(r, Some(a))) write(a, r) | |
// If the write succeeded, release a thread waiting for a value. | |
else readLatch.release() | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment