Skip to content

Instantly share code, notes, and snippets.

@larroy
Forked from joa/go.scala
Last active August 29, 2015 14:05
Show Gist options
  • Save larroy/1f2f448fcb09a0e22099 to your computer and use it in GitHub Desktop.
Save larroy/1f2f448fcb09a0e22099 to your computer and use it in GitHub Desktop.
package go
import java.util.concurrent.{ArrayBlockingQueue => JArrayBlockingQueue, BlockingQueue => JBlockingQueue, TimeUnit}
object Channel {
def empty[A]: Channel[A] = new BlockingChannel()
def make[A]: Channel[A] = make(1)
def make[A](capacity: Int): Channel[A] = new BlockingChannel(capacity)
}
trait ChannelOps {
@volatile private var _closed = false
def close() {
_closed = true
}
def closed: Boolean = _closed
def open: Boolean = !closed
def nonEmpty: Boolean = !isEmpty
def isEmpty: Boolean
}
trait ReceiveChannel[A] extends ChannelOps {
def unary_! : Option[A]
def foreach[U](f: Option[A] => U) {
while (open || nonEmpty) {
f(this.unary_!)
}
}
def forall(f: Option[A] => Boolean): Boolean = {
while (open || nonEmpty) {
if (!f(this.unary_!)) {
return false
}
}
true
}
}
trait SendChannel[A] extends ChannelOps {
def !(value: A)
}
trait Channel[A] extends ReceiveChannel[A] with SendChannel[A]
class BlockingChannel[A](capacity: Int = 1) extends Channel[A] {
private val blockingQueue: JBlockingQueue[A] =
new JArrayBlockingQueue(capacity)
override def isEmpty: Boolean = blockingQueue.isEmpty
override def !(value: A) {
if (open) {
blockingQueue.put(value)
}
}
override def unary_! : Option[A] = {
while (true) {
if (closed && isEmpty) {
return None
} else {
val ret = blockingQueue.poll(4, TimeUnit.SECONDS)
// scalastyle:off null
if (ret != null)
return Some(ret)
// scalastyle:on null
}
}
assert(false)
None
}
}
object Go {
import scala.concurrent._
def apply[T](f: => T)(implicit execctx: ExecutionContext): Future[T] = future {
f
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment