Created
February 6, 2021 23:41
-
-
Save araqnid/3d092e5fac19235352ec4c703742da80 to your computer and use it in GitHub Desktop.
Match sequence/Flow using a coroutine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.collect | |
import kotlinx.coroutines.flow.flow | |
import kotlin.coroutines.Continuation | |
import kotlin.coroutines.CoroutineContext | |
import kotlin.coroutines.EmptyCoroutineContext | |
import kotlin.coroutines.RestrictsSuspension | |
import kotlin.coroutines.resume | |
import kotlin.coroutines.startCoroutine | |
import kotlin.coroutines.suspendCoroutine | |
@RestrictsSuspension | |
interface MatcherScope<out E : Any> { | |
suspend fun yield(): E? | |
} | |
interface Matcher<in E, out R> { | |
fun push(item: E) | |
fun finish() | |
val isTerminated: Boolean | |
val producedResult: R? | |
val threwException: Throwable? | |
} | |
inline fun <R, T> Matcher<*, R>.foldResult(onSuccess: (R) -> T, onFailure: (Throwable) -> T): T? { | |
if (!isTerminated) return null | |
return if (threwException != null) { | |
onFailure(threwException!!) | |
} else { | |
@Suppress("UNCHECKED_CAST") | |
onSuccess(producedResult as R) | |
} | |
} | |
private class MatcherCoroutine<E : Any, R>(block: suspend MatcherScope<E>.() -> R) : MatcherScope<E>, Matcher<E, R> { | |
override var isTerminated = false | |
private set | |
override var producedResult: R? = null | |
private set | |
override var threwException: Throwable? = null | |
private set | |
private var pushedEvent: E? = null | |
private var onNextPush: Continuation<E?>? = null | |
init { | |
block.startCoroutine(this, object : Continuation<R> { | |
override val context: CoroutineContext | |
get() = EmptyCoroutineContext | |
override fun resumeWith(result: Result<R>) { | |
isTerminated = true | |
result.fold({ producedResult = it }, { threwException = it }) | |
} | |
}) | |
} | |
private fun shift(): E? { | |
val shifted = pushedEvent | |
pushedEvent = null | |
return shifted | |
} | |
override suspend fun yield(): E? { | |
val event = shift() | |
if (event != null) return event | |
return suspendCoroutine { cont -> | |
onNextPush = cont | |
} | |
} | |
override fun push(item: E) { | |
check(!isTerminated) { "Matcher already terminated" } | |
onNextPush!!.resume(item) | |
check(isTerminated || onNextPush != null) { "Matcher should have either terminated or suspended" } | |
} | |
override fun finish() { | |
check(!isTerminated) { "Matcher already terminated" } | |
onNextPush!!.resume(null) | |
check(isTerminated || onNextPush != null) { "Matcher should have either terminated or suspended" } | |
} | |
} | |
fun <E : Any, R> matcher(block: suspend MatcherScope<E>.() -> R): Matcher<E, R> { | |
return MatcherCoroutine(block) | |
} | |
fun <E : Any, R> matchSequence(stream: Sequence<E>, block: suspend MatcherScope<E>.() -> R): R { | |
val matcher: Matcher<E, R> = matcher(block) | |
for (item in stream) { | |
matcher.push(item) | |
matcher.foldResult({ return it }, { throw it }) | |
} | |
matcher.finish() | |
matcher.foldResult({ return it }, { throw it }) | |
error("Matcher function did not complete") | |
} | |
private class EarlyTermination : RuntimeException() | |
fun <E : Any, R> Flow<E>.matchElements(block: suspend MatcherScope<E>.() -> R): Flow<R> = flow { | |
val matcher: Matcher<E, R> = matcher(block) | |
var result: R? = null | |
try { | |
collect { item: E -> | |
matcher.push(item) | |
matcher.foldResult({ result = it; throw EarlyTermination() }, { throw it }) | |
} | |
matcher.finish() | |
matcher.foldResult({ emit(it); return@flow }, { throw it }) | |
} catch (e: EarlyTermination) { | |
@Suppress("UNCHECKED_CAST") | |
emit(result as R) | |
return@flow | |
} | |
error("Matcher function did not complete") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment