Skip to content

Instantly share code, notes, and snippets.

@araqnid
Created February 6, 2021 23:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save araqnid/3d092e5fac19235352ec4c703742da80 to your computer and use it in GitHub Desktop.
Save araqnid/3d092e5fac19235352ec4c703742da80 to your computer and use it in GitHub Desktop.
Match sequence/Flow using a coroutine
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.RestrictsSuspension
import kotlin.coroutines.resume
import kotlin.coroutines.startCoroutine
import kotlin.coroutines.suspendCoroutine
@RestrictsSuspension
interface MatcherScope<out E : Any> {
suspend fun yield(): E?
}
interface Matcher<in E, out R> {
fun push(item: E)
fun finish()
val isTerminated: Boolean
val producedResult: R?
val threwException: Throwable?
}
inline fun <R, T> Matcher<*, R>.foldResult(onSuccess: (R) -> T, onFailure: (Throwable) -> T): T? {
if (!isTerminated) return null
return if (threwException != null) {
onFailure(threwException!!)
} else {
@Suppress("UNCHECKED_CAST")
onSuccess(producedResult as R)
}
}
private class MatcherCoroutine<E : Any, R>(block: suspend MatcherScope<E>.() -> R) : MatcherScope<E>, Matcher<E, R> {
override var isTerminated = false
private set
override var producedResult: R? = null
private set
override var threwException: Throwable? = null
private set
private var pushedEvent: E? = null
private var onNextPush: Continuation<E?>? = null
init {
block.startCoroutine(this, object : Continuation<R> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<R>) {
isTerminated = true
result.fold({ producedResult = it }, { threwException = it })
}
})
}
private fun shift(): E? {
val shifted = pushedEvent
pushedEvent = null
return shifted
}
override suspend fun yield(): E? {
val event = shift()
if (event != null) return event
return suspendCoroutine { cont ->
onNextPush = cont
}
}
override fun push(item: E) {
check(!isTerminated) { "Matcher already terminated" }
onNextPush!!.resume(item)
check(isTerminated || onNextPush != null) { "Matcher should have either terminated or suspended" }
}
override fun finish() {
check(!isTerminated) { "Matcher already terminated" }
onNextPush!!.resume(null)
check(isTerminated || onNextPush != null) { "Matcher should have either terminated or suspended" }
}
}
fun <E : Any, R> matcher(block: suspend MatcherScope<E>.() -> R): Matcher<E, R> {
return MatcherCoroutine(block)
}
fun <E : Any, R> matchSequence(stream: Sequence<E>, block: suspend MatcherScope<E>.() -> R): R {
val matcher: Matcher<E, R> = matcher(block)
for (item in stream) {
matcher.push(item)
matcher.foldResult({ return it }, { throw it })
}
matcher.finish()
matcher.foldResult({ return it }, { throw it })
error("Matcher function did not complete")
}
private class EarlyTermination : RuntimeException()
fun <E : Any, R> Flow<E>.matchElements(block: suspend MatcherScope<E>.() -> R): Flow<R> = flow {
val matcher: Matcher<E, R> = matcher(block)
var result: R? = null
try {
collect { item: E ->
matcher.push(item)
matcher.foldResult({ result = it; throw EarlyTermination() }, { throw it })
}
matcher.finish()
matcher.foldResult({ emit(it); return@flow }, { throw it })
} catch (e: EarlyTermination) {
@Suppress("UNCHECKED_CAST")
emit(result as R)
return@flow
}
error("Matcher function did not complete")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment