Created
November 27, 2013 11:39
-
-
Save pchlupacek/7674344 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.stream | |
import scalaz.stream.Process._ | |
import scalaz.stream.These.{This, That} | |
sealed trait ReceiveThese[+A,+B] | |
case class Receive[+A,+B](t:These[A,B]) extends ReceiveThese[A,B] | |
case class HaltL(e:Throwable) extends ReceiveThese[Nothing,Nothing] | |
case class HaltR(e:Throwable) extends ReceiveThese[Nothing,Nothing] | |
trait wyer { | |
import wyer._ | |
/** | |
* Let through the right branch as long as the left branch is `false`, | |
* listening asynchronously for the left branch to become `true`. | |
* This halts as soon as the right branch halts. | |
*/ | |
def interrupt[I]: WyeR[Boolean, I, I] = { | |
def go[I]: WyeR[Boolean, I, I] = awaitRcvBoth[Boolean,I].flatMap { | |
case Receive(That(None)) => halt | |
case Receive(That(i)) => emit(i) ++ go | |
case Receive(This(kill)) => if (kill) halt else go | |
case Receive(These(kill, i)) => if (kill) halt else emit(i) ++ go | |
case Receive(These(kill, None)) => halt | |
case HaltL(e) => go | |
case HaltR(e) => Halt(e) | |
} | |
go[I] | |
} | |
} | |
object wyer { | |
case class ReceiveEnv[-I,-I2]() { | |
sealed trait Y[-X] { | |
def tag: Int | |
} | |
sealed trait Rcv[-X,-X2] extends Y[ReceiveThese[X,X2]] | |
case object RcvBoth extends Rcv[I,I2] { | |
def tag: Int = 2 | |
} | |
case object RcvL extends Rcv[I,Nothing] { | |
def tag: Int = 0 | |
} | |
case object RcvR extends Rcv[Nothing,I2] { | |
def tag: Int = 1 | |
} | |
} | |
private val RcvBoth_ = ReceiveEnv[Any,Any].RcvBoth | |
private val RcvL_ = ReceiveEnv[Any,Any].RcvL | |
private val RcvR_ = ReceiveEnv[Any,Any].RcvR | |
def RcvBoth[I,I2] : ReceiveEnv[I,I2]#Y[ReceiveThese[I,I2]] = RcvBoth_ | |
def RcvL[I] : ReceiveEnv[I,Nothing]#Y[ReceiveThese[I,Nothing]] = RcvL_ | |
def RcvR[I2] : ReceiveEnv[Nothing,I2]#Y[ReceiveThese[Nothing,I2]] = RcvR_ | |
def awaitRcvBoth[I,I2]: WyeR[I,I2,ReceiveThese[I,I2]] = | |
await(RcvBoth[I,I2])(emit) | |
def awaitRcvL[I,I2]: WyeR[I,I2,ReceiveThese[I,Nothing]] = | |
await(RcvL[I])(emit) | |
def awaitRcvR[I,I2]: WyeR[I,I2,ReceiveThese[Nothing,I2]] = | |
await(RcvR[I2])(emit) | |
def feedR[I,I2,O](i: Seq[I2])(p: WyeR[I,I2,O]): WyeR[I,I2,O] = { | |
def go(in: Seq[I2], out: Vector[Seq[O]], cur: WyeR[I,I2,O]): WyeR[I,I2,O] = { | |
if (in.nonEmpty) cur match { | |
case h@Halt(_) => emitSeq(out.flatten, h) | |
case Emit(h, t) => go(in, out :+ h, t) | |
} else { | |
emitSeq(out.flatten,cur) | |
} | |
} | |
go(i,Vector(),p) | |
} | |
private object AwaitRcv { | |
def unapply[I,I2,O](tag:Int, self: WyeR[I,I2,O]): | |
Option[(ReceiveThese[I,I2] => WyeR[I,I2,O], WyeR[I,I2,O], WyeR[I,I2,O])] = self match { | |
case Await(req ,recv,fb,c) if req.tag == tag => Some((recv.asInstanceOf[ReceiveThese[I,I2] => WyeR[I,I2,O]], fb, c)) | |
case _ => None | |
} | |
} | |
object AwaitRcvBoth { | |
def unapply[I,I2,O](self: WyeR[I,I2,O]) | |
:Option[(ReceiveThese[I,I2] => WyeR[I,I2,O], WyeR[I,I2,O], WyeR[I,I2,O])] = AwaitRcv.unapply(2,self) | |
def apply[I,I2,O](recv: ReceiveThese[I,I2] => WyeR[I,I2,O], | |
fallback: WyeR[I,I2,O] = halt, | |
cleanup: WyeR[I,I2,O] = halt): WyeR[I,I2,O] = | |
await(RcvBoth[I,I2])(recv, fallback, cleanup) | |
} | |
object AwaitRcvL { | |
def unapply[I,I2,O](self: WyeR[I,I2,O]): | |
Option[(ReceiveThese[I,I2] => WyeR[I,I2,O], WyeR[I,I2,O], WyeR[I,I2,O])] = AwaitRcv.unapply(0,self) | |
def apply[I,I2,O](recv: ReceiveThese[I,I2] => WyeR[I,I2,O], | |
fallback: WyeR[I,I2,O] = halt, | |
cleanup: WyeR[I,I2,O] = halt): WyeR[I,I2,O] = | |
await(RcvL[I])(recv, fallback, cleanup) | |
} | |
object AwaitRcvR { | |
def unapply[I,I2,O](self: WyeR[I,I2,O]): | |
Option[(ReceiveThese[I,I2] => WyeR[I,I2,O], WyeR[I,I2,O], WyeR[I,I2,O])] = AwaitRcv.unapply(1,self) | |
def apply[I,I2,O](recv: ReceiveThese[I,I2] => WyeR[I,I2,O], | |
fallback: WyeR[I,I2,O] = halt, | |
cleanup: WyeR[I,I2,O] = halt): WyeR[I,I2,O] = | |
await(RcvR[I2])(recv, fallback, cleanup) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment