Skip to content

Instantly share code, notes, and snippets.

@vhutov
Created July 23, 2020 09:23
Show Gist options
  • Save vhutov/cfdb274aae762027de28ce6ef26bdaad to your computer and use it in GitHub Desktop.
Save vhutov/cfdb274aae762027de28ce6ef26bdaad to your computer and use it in GitHub Desktop.
PartialStreamConsume
package object consume {
implicit class RichStream[F[_], A](s: Stream[F, A]) {
def consumeUntil(f: A => Boolean)
(implicit C: Stream.Compiler[F, F], ME: MonadError[F, Throwable]): F[(Vector[A], Stream[F, A])] = {
consumeUntilAs(f, _.some)
}
def consumeUntilAs[B](f: A => Boolean, g: A => Option[B])
(implicit C: Stream.Compiler[F, F], ME: MonadError[F, Throwable]): F[(Vector[B], Stream[F, B])] = {
consumeUntil0(f, g)
.take(1)
.compile
.lastOrError
}
private def consumeUntil0[B](f: A => Boolean, g: A => Option[B]): Stream[F, (Vector[B], Stream[F, B])] = {
def go(str: Stream[F, A], buffer: Vector[B]): Pull[F, (Vector[B], Stream[F, B]), Unit] = {
str.pull.uncons.flatMap {
case None => Pull.output1(buffer -> Stream.empty) >> Pull.done
case Some((h, t)) =>
val idx = h.indexWhere(f)
idx match {
case None => go(t, buffer ++ h.toVector.mapFilter(g))
case Some(idx) =>
val (before, after) = h.splitAt(idx)
val newBuffer = buffer ++ before.toVector.mapFilter(g)
val newTail = (Stream.chunk(after) ++ t).mapFilter(g)
Pull.output1(newBuffer -> newTail) >> Pull.done
}
}
}
go(s, Vector()).stream
}
}
}
object UseCase extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val s = Stream[IO, Either[String, Int]](1.asRight, 2.asRight, 3.asRight) ++
Stream.sleep_(1.second) ++
Stream[IO, Either[String, Int]]("token".asLeft) ++
Stream[IO, Either[String, Int]](4.asRight, 5.asRight, "token".asLeft, 6.asRight)
val partiallyConsumed: IO[(Vector[Int], Stream[IO, Int])] = s.consumeUntilAs(_.isLeft, _.right.toOption)
partiallyConsumed.flatMap { case (initialState, rest) =>
rest.scan(initialState)(_ :+ _)
.evalTap(buff => IO(println(buff)))
.compile
.drain
}.as(ExitCode.Success)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment