Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Created November 14, 2016 11:05
Show Gist options
  • Save pchlupacek/697cab6a95918ef66babaaa68aa29734 to your computer and use it in GitHub Desktop.
Save pchlupacek/697cab6a95918ef66babaaa68aa29734 to your computer and use it in GitHub Desktop.
Lifting the scalaz.stream.Process1 to fs2.Pipe
/**
* Runs supplied process1 `process` in scope of resulting pipe.
*/
def liftProcess1[F[_],I,O](process:Process1[I,O]):Pipe[F,I,O] = {
def go(p1:Process1[I,O]):Handle[F,I] => Pull[F,O,Unit] = {
_.receive1 {
case (evt,h) => p1.feed1(evt).unemit match {
case (out, np1) => np1 match {
case Halt(End | Kill) => Pull.output(Chunk.seq(out))
case Halt(Error(rsn)) => Pull.output(Chunk.seq(out)) >> Pull.fail(rsn)
case other => Pull.output(Chunk.seq(out)) >> go(other)(h)
}
}
}
}
_.pull(go(process))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment