Skip to content

Instantly share code, notes, and snippets.

@justinhj
Last active April 9, 2018 18:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinhj/10fd8dc2a364a1fdbade36352b15abe0 to your computer and use it in GitHub Desktop.
Save justinhj/10fd8dc2a364a1fdbade36352b15abe0 to your computer and use it in GitHub Desktop.
Trying to write a function using fs2 to stream until N items have been seen
def takeUntilNThings[F[_],O](n: Long, thing: O): Pipe[F,O,O] = {
def go(s: Stream[F,O], seenCount : Int) : Pull[F,O,Unit] = {
if(seenCount == n) {
Pull.done
}
else {
s.pull.uncons1.flatMap {
case Some((o, tl)) =>
if(o == thing) {
Pull.output1(o) >> go(tl, seenCount + 1)
}
else {
Pull.output1(0) >> go(tl, seenCount)
}
case None =>
Pull.done
}
}
}
in => go(in, 0).stream
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment