Skip to content

Instantly share code, notes, and snippets.

@aryairani
Created March 9, 2016 18:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aryairani/a532b071ef3f9c83e239 to your computer and use it in GitHub Desktop.
Save aryairani/a532b071ef3f9c83e239 to your computer and use it in GitHub Desktop.
package chunks
import java.time.LocalTime
import scalaz.concurrent.Task
import scalaz.stream._
object Blah extends App {
println(run(1 to 10 toList).mkString("\n"))
def run(in: List[Int]) = {
val start = Process(in: _*)
val left = async.unboundedQueue[Int]
val right = async.unboundedQueue[Int]
type LeftQueueEffect = Int => Task[Unit]
type RightQueueEffect = Int => Task[Unit]
val step1: Process[Task, (Int, (LeftQueueEffect, RightQueueEffect))] =
start.zip(left.enqueue.zip(right.enqueue))
val step2: Process[Task, Task[Unit]] = step1
.map { case (el, (lEnqueue, rEnqueue)) =>
if (el % 2 == 0) lEnqueue(el) else rEnqueue(el)
}
val step3: Process[Task, LocalTime] = step2.eval.map {
_ => LocalTime.now()
}
val enqueue: Process[Task, LocalTime] = step3.onComplete(Process.eval_(left.close) ++ Process.eval_(right.close))
val processElement = (el: Int) => Task {
Thread.sleep(1000L);
el * 2
}
val lDequeue = left.dequeue.evalMap(processElement)
val rDequeue = right.dequeue.evalMap(processElement)
val dequeue: Process[Task,Int] = lDequeue merge rDequeue
val foreverDequeue = Process.fill(5)(lDequeue) ++ Process.fill(5)(rDequeue)
val nDequeue: Process[Task, (Int, LocalTime)] = merge.mergeN(foreverDequeue).map {
i => (i, LocalTime.now())
}
enqueue
.wye(nDequeue)(wye.either) // emits time for enqueue or Int for dequeue
.runLog.run.toList
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment