Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created November 21, 2019 21:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahoy-jon/90cabce02b64aeae9f9c6b9ebbbfffc1 to your computer and use it in GitHub Desktop.
Save ahoy-jon/90cabce02b64aeae9f9c6b9ebbbfffc1 to your computer and use it in GitHub Desktop.
class ToIterator private (runtime: Runtime[Any]) {
def unsafeCreate[V](q: UIO[stream.Stream[_,V]]): Iterator[V] =
new Iterator[V] {
import ToIterator._
var state: State[V] = Running
val synchronousBlockQueue: BlockingQueue[ValueOrClosed[V]] = new SynchronousQueue[ValueOrClosed[V]]
private def put(value:ValueOrClosed[V]): Task[Unit] = IO.effect(synchronousBlockQueue.put(value))
private val io = q >>= (_.map(Value.apply).foreachManaged(put).use_(put(Closed)))
private val thread: Thread = new Thread() {
override def run(): Unit =
runtime.unsafeRunAsync_(io)
}
var threadStarted = false
def checkStarted(): Unit = {
if(!threadStarted) thread.start()
threadStarted = true
}
private def pool(): ValueOrClosed[V] = {
checkStarted()
synchronousBlockQueue.take()
}
override def hasNext: Boolean =
state match {
case Closed => false
case Value(_) => true
case Running =>
state = pool()
state != Closed
}
private val undefinedBehavior = new NoSuchElementException("next on empty iterator")
override def next(): V =
state match {
case Value(value) =>
state = Running
value
case Closed => throw undefinedBehavior
case Running =>
pool() match {
case Closed =>
state = Closed
throw undefinedBehavior
case Value(v) =>
//stage = Running
v
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment