Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Last active March 25, 2021 04:34
Show Gist options
  • Save Daenyth/f704d6134a0230cef585f26ce113f188 to your computer and use it in GitHub Desktop.
Save Daenyth/f704d6134a0230cef585f26ce113f188 to your computer and use it in GitHub Desktop.
iterate through a cursor-based api (eg paging) into an fs2 stream / "loopEval"
def call[A, B](
a: A,
f: A => IO[(A, Option[B])],
n: (A, B) => IO[(A, Option[B])]
): Stream[IO, A] = {
def go(emit: A, next: B): Pull[IO, A, Unit] = {
Pull.output1(emit) >> Pull.eval(n(emit, next)).flatMap {
case (toEmit, Some(nxt)) => go(toEmit, nxt)
case (toEmit, _) => Pull.output1(toEmit) >> Pull.done
}
}
Stream.eval(f(a)).flatMap {
case (s, Some(next)) =>
go(s, next).stream
case (s, None) =>
Stream.emit(s)
}
}
def unfoldEval2[F[_], A, B, C](initial: A)(
f: A => F[(B, Option[C])],
loop: (B, C) => F[(B, Option[C])]
): Stream[F, B] = {
def go(b: B, c: C): Stream[F, B] =
Stream.eval(loop(b, c)).flatMap {
case (o, Some(nxt)) => Stream.emit(o) ++ go(o, nxt)
case (o, _) => Stream.emit(o)
}
Stream.eval(f(initial)).flatMap {
case (o, Some(nxt)) =>
Stream.emit(o) ++ go(o, nxt)
case (o, _) =>
Stream.emit(o)
}
}
@Daenyth
Copy link
Author

Daenyth commented Jun 21, 2018

  def loopEval[F[_], A, B](
      init: F[A],
      next: A => Option[B],
      continue: B => F[A]
  ): Stream[F, A] =
    Stream.eval(init).flatMap { a =>
      Stream.emit(a) ++ (next(a) match {
        case Some(b) => loopEval(continue(b), next, continue)
        case None    => Stream.empty
      })
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment