Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created February 5, 2020 10:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahoy-jon/284bff4495c7bd42c4a610fb26174b0c to your computer and use it in GitHub Desktop.
Save ahoy-jon/284bff4495c7bd42c4a610fb26174b0c to your computer and use it in GitHub Desktop.
def toIterator[R, E, A](
stream: ZStream[R, E, A]
): ZManaged[R, Nothing, Iterator[Either[E, A]]] = {
/*
* Internal state of a Iterator pulling from a ZStream
*
* It starts as Running
*
* when Running , on pull (hasNext), pull the ZStream and switch to Closed or Value
* when Value , on consume (next), return the Value and switch to Running
* when Closed , on pull (hasNext), stays Closed
*/
sealed trait State
case object Running extends State
sealed trait ValueOrClosed extends State
case object Closed extends ValueOrClosed
case class Value(value: Either[E, A]) extends ValueOrClosed
for {
state <- RefM.make[State](Running).toManaged_
pull <- stream.process
runtime <- ZIO.runtime[R].toManaged_
} yield {
val pool: URIO[R, ValueOrClosed] =
pull.fold({
case None => Closed
case Some(e) => Value(Left(e))
}, x => Value(Right(x)))
val _next: RIO[R, Either[E, A]] = {
def loop: State => URIO[R, (Either[Throwable, Either[E, A]], State)] = {
case Closed => UIO(Left(new NoSuchElementException("next on empty iterator")) -> Closed)
case Value(x) => UIO(Right(x) -> Running)
case Running => pool >>= loop
}
state
.modify(loop)
.absolve
}
val _hasNext: URIO[R, Boolean] = {
def loop: State => URIO[R, (Boolean, State)] = {
case Closed => UIO(false -> Closed)
case x: Value => UIO(true -> x)
case Running => pool >>= loop
}
state.modify(loop)
}
new Iterator[Either[E, A]] {
override def hasNext: Boolean = runtime.unsafeRun(_hasNext)
override def next(): Either[E, A] = runtime.unsafeRun(_next)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment