Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Last active November 2, 2016 13:59
Show Gist options
  • Save djspiewak/6b5cd3fb78b054046755 to your computer and use it in GitHub Desktop.
Save djspiewak/6b5cd3fb78b054046755 to your computer and use it in GitHub Desktop.
sealed trait AcceptT[+F[+_], -I, +O]
object AcceptT {
final case class Effect[+F[+_], +O](f: F[O]) extends AcceptT[F, Any, O]
final case class Accept[+F[+_], I]() extends AcceptT[F, I, I]
}
type Transducer[+F[+_], -I, +O] = Process[({ type λ[+α] = AcceptT[F, I, α] })#λ, O]
// type Process1[I, O] = Transducer[Identity, I, O]
// type Channel = Obsolete
// type Sink[F[+_], A] = Transducer[F, A, Unit]
object transducer {
def receiveF[F[+_], A, I, O](effect: F[A])(f: A => Transducer[F, I, O]): Transducer[F, I, O] =
await[({ type λ[+α] = AcceptT[F, I, α] })#λ, A, O](AcceptT.Effect(effect))(f)
def receive1[F[+_], I, O](f: I => Transducer[F, I, O]): Transducer[F, I, O] =
await[({ type λ[+α] = AcceptT[F, I, α] })#λ, I, O](AcceptT.Accept[F, I]())(f)
}
implicit class TransducerSyntax[F[+_], A](val self: Process[F, A]) extends AnyVal {
import Cause.{EarlyCause, End, Kill}
def transduce[B](trans: Transducer[F, A, B])(implicit F: Applicative[F], C: Catchable[F]): Process[F, B] = trans.suspendStep flatMap {
// fail suck ugliness to get around limitations in GADT pattern matching
case Halt(rsn) => self.kill onHalt { _ => Halt(rsn) }
case step: Step[({ type λ[+α] = AcceptT[F, A, α] })#λ, B] => {
val cont1 = step.next
step.head match {
case awt: Await[({ type λ[+α] = AcceptT[F, A, α] })#λ, a, B] => {
val req = awt.req
val rcv = awt.rcv
req match {
case AcceptT.Effect(eff) => {
val fp = eff.attempt map { _ leftMap { Error(_) } } map { res =>
self transduce (rcv(res).run +: cont1) // TODO errors? I don't have access to Util.Try, because... blergh
}
eval(fp) flatMap { a => a }
}
case AcceptT.Accept() => self.step match {
case Step(awt @ Await(_, _), cont) => awt extend { p => new TransducerSyntax[F, A](p +: cont) transduce step.toProcess }
case Step(Emit(Seq()), cont) => cont.continue transduce step.toProcess
case Step(Emit(is), cont) => {
val nextT = rcv(\/-(is.head.asInstanceOf[a])).run +: cont1 // this is sound because Accept proves it so
val nextS = emitAll(is.tail) +: cont
new TransducerSyntax[F, A](nextS) transduce nextT
}
case hlt @ Halt(End) => new TransducerSyntax[F, A](hlt) transduce (step.toProcess disconnect Kill swallowKill)
case hlt @ Halt(rsn: Cause.EarlyCause) => new TransducerSyntax[F, A](hlt) transduce (step.toProcess disconnect rsn)
}
}
}
case emt @ Emit(os) => {
emt onHalt {
case End => self transduce cont1.continue
case early => self transduce (Halt(early) +: cont1) causedBy early
}
}
}
}
}
}
object TransducerSpecs extends Specification with ScalaCheck {
import Process._
import StreamUtils._
"effectful stream transducers" should {
def id[I]: Transducer[Nothing, I, I] =
transducer.receive1[Nothing, I, I](emit).repeat
"perform a simple identity transformation" in prop { xs: List[List[Int]] =>
val p = emitAll(xs map emitAll).toSource.join
(p transduce id).runLog.run mustEqual xs.flatten
}
"perform an arbitrary transformation" in prop { (xs: List[List[Int]], f: Int => Int) =>
val t = transducer receive1 { i: Int => emit(f(i)) } repeat
val p = emitAll(xs map emitAll).toSource.join
(p transduce t).runLog.run mustEqual (xs.flatten map f)
}
"control the state machine based on an effect" in {
def subT1(i: Int) = emit(i * 2)
def subT2(i: Int) = emit(i)
val t = transducer receive1 { i: Int =>
transducer.receiveF(Task now true) { b =>
if (b) subT1(i) else subT2(i)
}
} repeat
(Process(1, 2, 3) transduce t).runLog.run mustEqual Vector(2, 4, 6)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment