|
package asynciterator |
|
|
|
import kotlinx.coroutines.* |
|
import kotlin.coroutines.intrinsics.createCoroutineUnintercepted |
|
import kotlin.coroutines.* |
|
import kotlin.experimental.ExperimentalTypeInference |
|
import kotlin.system.getTimeMillis |
|
|
|
|
|
interface AsyncCoroutineScope<T> : CoroutineScope { |
|
suspend fun yield(value: T) |
|
} |
|
|
|
|
|
interface AsyncIterator<T> { |
|
operator fun iterator() = this |
|
suspend operator fun hasNext(): Boolean |
|
suspend operator fun next(): T |
|
} |
|
|
|
|
|
private sealed class Status<T> |
|
private class NotReady<T> : Status<T>() |
|
private class Ready<T>(val value: T) : Status<T>() |
|
private class Done<T> : Status<T>() |
|
private class Failed<T>(val exception: Throwable) : Status<T>() |
|
|
|
|
|
class AsyncIteratorImpl<T>( |
|
override val coroutineContext: CoroutineContext |
|
) : AsyncIterator<T>, AsyncCoroutineScope<T>, Continuation<Unit> { |
|
private lateinit var mainContinuation: Continuation<Status<T>> |
|
internal lateinit var sequenceContinuation: Continuation<Unit> |
|
|
|
private var status: Status<T> = NotReady() |
|
|
|
override operator fun iterator() = this |
|
|
|
private suspend fun ensureNext() { |
|
if (status is NotReady) { |
|
status = suspendCoroutine { |
|
mainContinuation = it |
|
sequenceContinuation.resume(Unit) |
|
} |
|
status.let { if (it is Failed) throw it.exception } |
|
} |
|
} |
|
|
|
private fun throwBadStatus(): Nothing { |
|
when (val status = status) { |
|
is Done -> throw NoSuchElementException("Asynchronous iterator is exhausted") |
|
is Failed -> throw IllegalStateException("Asynchronous sequence block previously threw an exception", status.exception) |
|
else -> throw IllegalStateException("This ain't possible") |
|
} |
|
} |
|
|
|
override suspend operator fun hasNext(): Boolean { |
|
ensureNext() |
|
if (status is Done) return false |
|
if (status is Ready) return true |
|
throwBadStatus() |
|
} |
|
|
|
override suspend operator fun next(): T { |
|
ensureNext() |
|
status.let { if (it is Ready) return it.value.also { status = NotReady() } } |
|
throwBadStatus() |
|
} |
|
|
|
override suspend fun yield(value: T) = suspendCancellableCoroutine<Unit> { |
|
sequenceContinuation = it |
|
mainContinuation.resume(Ready(value)) |
|
} |
|
|
|
// the following is completion continuation stuff. resumeWith here will be called when the coroutine |
|
// either run to the very end or throws an exception; this includes CancellationException |
|
override val context = coroutineContext |
|
|
|
override fun resumeWith(result: Result<Unit>) { |
|
val exception = result.exceptionOrNull() |
|
if (exception is CancellationException) return |
|
mainContinuation.resume(if (exception != null) Failed(exception) else Done()) |
|
} |
|
} |
|
|
|
|
|
@OptIn(ExperimentalTypeInference::class) |
|
fun <T> CoroutineScope.asyncSequence(@BuilderInference block: suspend AsyncCoroutineScope<T>.() -> Unit): AsyncIterator<T> { |
|
return AsyncIteratorImpl<T>(coroutineContext).apply { |
|
sequenceContinuation = block.createCoroutineUnintercepted(this, this) |
|
} |
|
} |