Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Created February 10, 2014 15:31
Show Gist options
  • Select an option

  • Save pchiusano/8917925 to your computer and use it in GitHub Desktop.

Select an option

Save pchiusano/8917925 to your computer and use it in GitHub Desktop.
Prefetching combinator for converting a `Tee` to a `Wye`, allowing for bounded nondeterminism
/**
* Convert the given `Tee` to a `Wye`, by allowing for bounded nondeterminism.
* That is, when the `Tee` requests from the left, as long as less than `maxBuffer`
* elements are enqueued on the right, convert this to an `AwaitBoth`. Likewise
* when the `Tee` requests from the right. Thus, elements are fed to the `Tee`
* in the order it demands, but we allow for nondeterminism in fetching the
* elements demanded.
*/
def prefetch[A,B,C](maxBuffer: Int)(tee: Tee[A,B,C]): Wye[A,B,C] = {
import scalaz.stream.tee.{AwaitL,AwaitR}
def go(cur: Tee[A,B,C], bufL: Vector[A], bufR: Vector[B]): Wye[A,B,C] =
cur match {
case h@Halt(_) => h
case Emit(h, t) => Emit(h, go(t, bufL, bufR))
case AwaitL(recv,fb,c) => if (bufR.size < maxBuffer) getBoth(cur, bufL, bufR)
else ...
case AwaitR(recv,fb,c) => if (bufL.size < maxBuffer) getBoth(cur, bufL, bufR)
else
case _ => awaitBoth[A,B].flatMap { ... }
}
def getBoth(cur: Tee[A,B,C], bufL: Vector[A], bufR: Vector[B]): Wye[A,B,C] = ...
go(tee, Vector(), Vector())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment