Skip to content

Instantly share code, notes, and snippets.

@joshcough
Last active August 29, 2015 14:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joshcough/09a40b0a42ee61581d79 to your computer and use it in GitHub Desktop.
Save joshcough/09a40b0a42ee61581d79 to your computer and use it in GitHub Desktop.
object Procs {
implicit class RichProcess[F[_], A](p: Process[F, A]) {
def through2[B,C](c1: Channel[F, A, B],
c2: Channel[F, B, C]): Process[F, C] =
p.flatMap(composeChannels(c1, c2))
}
implicit class RichTaskProcess[A](p: Process[Task, A]) {
def observeThrough2[B,C](c1: Channel[Task, A, B],
c2: Channel[Task, B, C]): Process[Task, (A, B, C)] =
p.flatMap(composeThroughChannels(c1, c2))
}
def composeChannels[F[_], A, B, C](c1: Channel[F, A, B],
c2: Channel[F, B, C]) = {
i: A => Process.emit(i).through(c1).through(c2)
}
def composeThroughChannels[A, B, C](c1: Channel[Task, A, B],
c2: Channel[Task, B, C]):
A => Process[Task, (A,B,C)] = { i: A =>
Process.emit(i)
.observeThrough(c1)
.observeThrough(c2.contramap(_._2))
.map{ case ((a,b), c) => (a,b,c) }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment