Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Created November 27, 2013 11:39
Show Gist options
  • Save pchlupacek/7674344 to your computer and use it in GitHub Desktop.
Save pchlupacek/7674344 to your computer and use it in GitHub Desktop.
package scalaz.stream
import scalaz.stream.Process._
import scalaz.stream.These.{This, That}
sealed trait ReceiveThese[+A,+B]
case class Receive[+A,+B](t:These[A,B]) extends ReceiveThese[A,B]
case class HaltL(e:Throwable) extends ReceiveThese[Nothing,Nothing]
case class HaltR(e:Throwable) extends ReceiveThese[Nothing,Nothing]
trait wyer {
import wyer._
/**
* Let through the right branch as long as the left branch is `false`,
* listening asynchronously for the left branch to become `true`.
* This halts as soon as the right branch halts.
*/
def interrupt[I]: WyeR[Boolean, I, I] = {
def go[I]: WyeR[Boolean, I, I] = awaitRcvBoth[Boolean,I].flatMap {
case Receive(That(None)) => halt
case Receive(That(i)) => emit(i) ++ go
case Receive(This(kill)) => if (kill) halt else go
case Receive(These(kill, i)) => if (kill) halt else emit(i) ++ go
case Receive(These(kill, None)) => halt
case HaltL(e) => go
case HaltR(e) => Halt(e)
}
go[I]
}
}
object wyer {
case class ReceiveEnv[-I,-I2]() {
sealed trait Y[-X] {
def tag: Int
}
sealed trait Rcv[-X,-X2] extends Y[ReceiveThese[X,X2]]
case object RcvBoth extends Rcv[I,I2] {
def tag: Int = 2
}
case object RcvL extends Rcv[I,Nothing] {
def tag: Int = 0
}
case object RcvR extends Rcv[Nothing,I2] {
def tag: Int = 1
}
}
private val RcvBoth_ = ReceiveEnv[Any,Any].RcvBoth
private val RcvL_ = ReceiveEnv[Any,Any].RcvL
private val RcvR_ = ReceiveEnv[Any,Any].RcvR
def RcvBoth[I,I2] : ReceiveEnv[I,I2]#Y[ReceiveThese[I,I2]] = RcvBoth_
def RcvL[I] : ReceiveEnv[I,Nothing]#Y[ReceiveThese[I,Nothing]] = RcvL_
def RcvR[I2] : ReceiveEnv[Nothing,I2]#Y[ReceiveThese[Nothing,I2]] = RcvR_
def awaitRcvBoth[I,I2]: WyeR[I,I2,ReceiveThese[I,I2]] =
await(RcvBoth[I,I2])(emit)
def awaitRcvL[I,I2]: WyeR[I,I2,ReceiveThese[I,Nothing]] =
await(RcvL[I])(emit)
def awaitRcvR[I,I2]: WyeR[I,I2,ReceiveThese[Nothing,I2]] =
await(RcvR[I2])(emit)
def feedR[I,I2,O](i: Seq[I2])(p: WyeR[I,I2,O]): WyeR[I,I2,O] = {
def go(in: Seq[I2], out: Vector[Seq[O]], cur: WyeR[I,I2,O]): WyeR[I,I2,O] = {
if (in.nonEmpty) cur match {
case h@Halt(_) => emitSeq(out.flatten, h)
case Emit(h, t) => go(in, out :+ h, t)
} else {
emitSeq(out.flatten,cur)
}
}
go(i,Vector(),p)
}
private object AwaitRcv {
def unapply[I,I2,O](tag:Int, self: WyeR[I,I2,O]):
Option[(ReceiveThese[I,I2] => WyeR[I,I2,O], WyeR[I,I2,O], WyeR[I,I2,O])] = self match {
case Await(req ,recv,fb,c) if req.tag == tag => Some((recv.asInstanceOf[ReceiveThese[I,I2] => WyeR[I,I2,O]], fb, c))
case _ => None
}
}
object AwaitRcvBoth {
def unapply[I,I2,O](self: WyeR[I,I2,O])
:Option[(ReceiveThese[I,I2] => WyeR[I,I2,O], WyeR[I,I2,O], WyeR[I,I2,O])] = AwaitRcv.unapply(2,self)
def apply[I,I2,O](recv: ReceiveThese[I,I2] => WyeR[I,I2,O],
fallback: WyeR[I,I2,O] = halt,
cleanup: WyeR[I,I2,O] = halt): WyeR[I,I2,O] =
await(RcvBoth[I,I2])(recv, fallback, cleanup)
}
object AwaitRcvL {
def unapply[I,I2,O](self: WyeR[I,I2,O]):
Option[(ReceiveThese[I,I2] => WyeR[I,I2,O], WyeR[I,I2,O], WyeR[I,I2,O])] = AwaitRcv.unapply(0,self)
def apply[I,I2,O](recv: ReceiveThese[I,I2] => WyeR[I,I2,O],
fallback: WyeR[I,I2,O] = halt,
cleanup: WyeR[I,I2,O] = halt): WyeR[I,I2,O] =
await(RcvL[I])(recv, fallback, cleanup)
}
object AwaitRcvR {
def unapply[I,I2,O](self: WyeR[I,I2,O]):
Option[(ReceiveThese[I,I2] => WyeR[I,I2,O], WyeR[I,I2,O], WyeR[I,I2,O])] = AwaitRcv.unapply(1,self)
def apply[I,I2,O](recv: ReceiveThese[I,I2] => WyeR[I,I2,O],
fallback: WyeR[I,I2,O] = halt,
cleanup: WyeR[I,I2,O] = halt): WyeR[I,I2,O] =
await(RcvR[I2])(recv, fallback, cleanup)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment