Skip to content

Instantly share code, notes, and snippets.

@cjweigle
Last active August 29, 2015 14:22
Show Gist options
  • Save cjweigle/6dd2becd6b4d686b3f4f to your computer and use it in GitHub Desktop.
Save cjweigle/6dd2becd6b4d686b3f4f to your computer and use it in GitHub Desktop.
scalaz streams gate 1
package finance;
import scalaz.stream._
import scalaz.concurrent.Task
import scalaz.stream.async.mutable.Queue;
import scalaz.stream.{Cause, Util, Process, Sink}
object gate1
{
//V is the value type
//T is the type returned from this gate
def gate[V, T](inputs : Seq[Process[Any,V]], f : Seq[V] => T) : Process[Task, T] = {
new Gate[V,T](inputs, f).output
}
}
class Gate[V,T](inputs : Seq[Process[Any,V]], f : Seq[V] => T)
{ self =>
private val queues: Seq[Queue[V]] = inputs.map(stream => {
val queue = async.boundedQueue[V](10)
stream to queue.enqueue
queue
})
private val output_q = async.boundedQueue[T](10)
def output = Process.suspend {
import scalaz.Scalaz._
if(queues.forall(v=>{ v.size.discrete.runLast.run match {
case Some(size) => (size > 0)
case None => false
}}))
{
println("all values are here")
val step_map: Seq[V] = queues.map(in_queue => {in_queue.dequeue.take(1).runLast.run match {
case Some(valu) => valu
case None => null
}})
output_q.enqueueOne(f(step_map))
}
inputs map {in => in.awaitOption}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment