Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Last active October 29, 2019 22:59
Show Gist options
  • Save ahoy-jon/915c87e1d2f857eb5b72049960e7680a to your computer and use it in GitHub Desktop.
Save ahoy-jon/915c87e1d2f857eb5b72049960e7680a to your computer and use it in GitHub Desktop.
object ZioToSparkIterator {
def toIterator[E, A](q: UIO[stream.Stream[E, A]]): Iterator[Either[E, A]] = new Iterator[Either[E, A]] {
sealed trait State
case object Running extends State
sealed trait ValueOrClosed extends State
case object Closed extends ValueOrClosed
case class Value(value: Either[E, A]) extends ValueOrClosed
var state: State = Running
val queue: SynchronousQueue[ValueOrClosed] = new SynchronousQueue()
val streamToConcurrentQueue: ZIO[Any, Throwable, Unit] = for {
stream <- q
_ <- stream.either.foreach(x => ZIO.effect(queue.put(Value(x))))
_ <- ZIO.effect(queue.put(Closed))
} yield {}
val zioThread: Thread = new Thread {
override def run(): Unit =
new DefaultRuntime {}.unsafeRun(streamToConcurrentQueue)
}
zioThread.start()
private def pool(): ValueOrClosed = queue.take()
override def hasNext: Boolean =
state match {
case Closed => false
case _: Value => true
case Running =>
state = pool()
state != Closed
}
private val undefinedBehavior = new Exception("called next on a closed Iterator")
override def next(): Either[E, A] =
state match {
case Value(value) =>
state = Running
value
case Closed => throw undefinedBehavior
case Running =>
pool() match {
case Closed =>
state = Closed
throw undefinedBehavior
case Value(v) =>
//stage = Running
v
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment