Skip to content

Instantly share code, notes, and snippets.

@ahmadmo
Last active October 17, 2018 06:39
Show Gist options
  • Save ahmadmo/a1bfa865a6e5ab22d7faca8e2f117989 to your computer and use it in GitHub Desktop.
Save ahmadmo/a1bfa865a6e5ab22d7faca8e2f117989 to your computer and use it in GitHub Desktop.
Go Channels in Scala
import Channel.{Core, Message}
object Channel {
private case class Message[A](value: A, last: Boolean)
private object Message {
private val LAST = Message(null, last = true)
def last[A]: Message[A] = LAST.asInstanceOf[Message[A]]
}
private class Core[A](capacity: Int) {
import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, SynchronousQueue}
val queue: BlockingQueue[Message[A]] = capacity match {
case 0 => new SynchronousQueue[Message[A]]()
case c if c > 0 => new ArrayBlockingQueue[Message[A]](capacity)
case _ => throw new IllegalArgumentException("invalid channel capacity.")
}
@volatile var closed = false
}
def synchronous[A]: Channel[A] =
make(0)
def make[A](capacity: Int): Channel[A] =
new Channel[A](new Core[A](capacity))
}
final class Channel[A] private(private val core: Core[A],
private val filter: A => Boolean = (_: A) => true) {
private def send(message: Message[A]): Unit = {
if (isOpen || message.last) {
core.queue.put(message)
} else {
throw new IllegalStateException("channel is closed.")
}
}
private def receive: Option[A] = {
if (isClosed && isEmpty) {
None
} else {
val m = core.queue.take()
if (!m.last && filter(m.value)) {
Some(m.value)
} else {
None
}
}
}
def !(value: A): Unit = send(Message(value, last = false))
def unary_! : Option[A] = receive
def unary_~ : A = receive.get
def withFilter(fn: A => Boolean): Channel[A] = {
new Channel[A](core, filter = fn)
}
def foreach[U](fn: A => U): Unit = {
while (isOpen || nonEmpty) {
receive.foreach(fn)
}
}
def forall(fn: A => Boolean): Boolean = {
var res = true
while (res && (isOpen || nonEmpty)) {
receive.foreach { message =>
res = fn(message)
}
}
res
}
def collect: Seq[A] =
collect[A](identity[A])
def collect[U](fn: A => U): Seq[U] = {
var res = Seq.empty[U]
foreach { message =>
res = res :+ fn(message)
}
res
}
def close(): Unit = {
core.closed = true
send(Message.last[A])
}
def isClosed: Boolean = core.closed
def isOpen: Boolean = !isClosed
def isEmpty: Boolean = core.queue.isEmpty
def nonEmpty: Boolean = !isEmpty
}
object ProducerConsumerExample {
val messages = Channel.synchronous[Int]
val done = Channel.synchronous[Boolean]
def produce(): Unit = {
for (i <- 1 to 10) {
messages ! i
Thread.sleep(100)
}
messages.close()
done ! true
}
def consume(): Unit = {
for (msg <- messages if msg % 2 == 0) {
println(msg)
}
}
def main(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.{Future => go}
go(produce())
go(consume())
!done
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment