Skip to content

Instantly share code, notes, and snippets.

@justinhj

justinhj/takeuntiln.scala

Last active Apr 9, 2018
Embed
What would you like to do?
Trying to write a function using fs2 to stream until N items have been seen
def takeUntilNThings[F[_],O](n: Long, thing: O): Pipe[F,O,O] = {
def go(s: Stream[F,O], seenCount : Int) : Pull[F,O,Unit] = {
if(seenCount == n) {
Pull.done
}
else {
s.pull.uncons1.flatMap {
case Some((o, tl)) =>
if(o == thing) {
Pull.output1(o) >> go(tl, seenCount + 1)
}
else {
Pull.output1(0) >> go(tl, seenCount)
}
case None =>
Pull.done
}
}
}
in => go(in, 0).stream
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment