Skip to content

Instantly share code, notes, and snippets.

@attilakruchio
Last active September 14, 2023 07:49
Show Gist options
  • Save attilakruchio/05d24f0eb9ed004f9539750a49b18759 to your computer and use it in GitHub Desktop.
Save attilakruchio/05d24f0eb9ed004f9539750a49b18759 to your computer and use it in GitHub Desktop.
EventChannelFlow implementation, which requires that the emission and collection happens on Main.immediate dispatcher, preventing any event losses.
/*
* Created by Attila Kruchió on 2023. 09. 13. 22:24
* attila.kruchio.dev@gmail.com
*
* Copyright (c) 2023.
* All rights reserved.
*/
package com.attila.kruchio.android.flow
import android.util.Log
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.withContext
/**
* Represents a unidirectional flow of data tailored for one-shot event scenarios. It ensures that a specific event is observed exactly once.
*
* This flow guarantees that all emitted events will be delivered to the collector, preventing any loss.
* One-shot events can only be guaranteed to arrive to the collector, if we are dispatching and collecting the
* events on [Dispatchers.Main.immediate]
*
* Important Note: This flow is designed to work reliably with a single collector. If multiple collectors are used with the same [EventChannelFlow],
* only the first collector will receive the emitted event.
*/
public interface EventChannelFlow<T> : Flow<T>
/**
* Represents a mutable version of [EventChannelFlow] that allows emitting a value.
*/
public interface MutableEventChannelFlow<T> : EventChannelFlow<T>, FlowCollector<T> {
public override suspend fun emit(value: T)
}
/**
* Specifies the context policy for handling events in the [EventChannelFlow].
* Different policies determine how events are processed with respect to coroutine contexts.
*/
public sealed interface ContextPolicy {
/**
* Switches the [CoroutineContext] automatically to [Dispatchers.Main.immediate] if the events are not collected or sent
* on this [CoroutineContext].
*
* Important note: This will break the context hierarchy, since it does not matter, if you want to
* collect the flow on for eg.: [Dispatchers.IO], under-the-hood it'll switch context to [Dispatchers.Main.immediate].
*
* Basically equals with the following code:
* ```kotlin
* withContext(Dispatchers.IO) {
* withContext(Dispatchers.Main.immediate) {
* // collecting or sending events will happen here
* }
* }
* ```
*/
object SwitchAutomatically : ContextPolicy
/**
* Requires the current coroutine to be executed on [Dispatchers.Main.immediate].
* If that's not the case, an [IllegalStateException] will be thrown, signing that you are collecting
* or receiving events on the wrong [CoroutineContext]
*/
object Require : ContextPolicy
/**
* Does not enforce any specific context policy.
*/
object None : ContextPolicy
/**
* Uses a specific coroutine context for event handling.
* @property coroutineContext The coroutine context to be used. Must not be [Dispatchers.Main.immediate].
*/
data class WithContext(val coroutineContext: CoroutineContext) : ContextPolicy {
init {
require(coroutineContext !== Dispatchers.Main.immediate) {
"Main.immediate is used by the Default policy, no need to specify the policy as WithContext"
}
}
}
companion object {
/**
* Provides a default context policy, which is set to [SwitchAutomatically].
*/
val Default: ContextPolicy
get() = SwitchAutomatically
}
}
/**
* Creates a new instance of [MutableEventChannelFlow]. This function allows you to create a mutable event flow,
* which can be used to emit events.
*
* @param contextPolicy The context policy for event handling. Default is Require.
* @return A new instance of [MutableEventChannelFlow].
*/
public fun <T> MutableEventChannelFlow(
contextPolicy: ContextPolicy = ContextPolicy.Default
): MutableEventChannelFlow<T> {
return EventChannelFlowImpl(contextPolicy)
}
/**
* Implementation of [MutableEventChannelFlow] interface. This class provides the underlying implementation for emitting
* and collecting values in an underlying [Channel] implementation based on the given [ContextPolicy].
*/
private class EventChannelFlowImpl<T>(
private val contextPolicy: ContextPolicy
) : MutableEventChannelFlow<T> {
private val internalChannel = Channel<T>(Channel.UNLIMITED)
private val internalChannelFlow = internalChannel.receiveAsFlow()
override suspend fun collect(collector: FlowCollector<T>) {
withPolicy { internalChannelFlow.collect(collector) }
}
override suspend fun emit(value: T) {
withPolicy { internalChannel.send(value) }
}
private suspend fun withPolicy(block: suspend () -> Unit) {
return when (val policy = contextPolicy) {
is ContextPolicy.None -> {
if (!isOnMainImmediate()) {
Log.w(
TAG,
"ContextPolicy is set to None and not running on Main.immediate." + " This means, that you could lose events"
)
}
block()
}
is ContextPolicy.Require -> {
requireCurrentContextIsMainImmediate()
block()
}
is ContextPolicy.SwitchAutomatically -> {
// FAST PATH -> Not switching context, if we are already on Main.immediate
if (isOnMainImmediate()) {
block()
} else {
withContext(Dispatchers.Main.immediate) { block() }
}
}
is ContextPolicy.WithContext -> {
withContext(policy.coroutineContext) { block() }
}
}
}
private suspend fun isOnMainImmediate(): Boolean {
val currentContext = currentCoroutineContext()[ContinuationInterceptor]
return currentContext === Dispatchers.Main.immediate
}
private suspend fun requireCurrentContextIsMainImmediate() {
check(isOnMainImmediate()) {
val currentContext = currentCoroutineContext()[ContinuationInterceptor]
"Current coroutineContext is not Dispatchers.Main.immediate, it is $currentContext"
}
}
companion object {
private const val TAG = "EventChannelFlow"
}
}
/**
* Converts a [MutableEventChannelFlow] to an immutable [EventChannelFlow].
*
* @return An immutable [EventChannelFlow].
*/
public fun <T> MutableEventChannelFlow<T>.asEventChannelFlow(): EventChannelFlow<T> {
return ReadonlyEventChannelFlow(this)
}
private class ReadonlyEventChannelFlow<T>(
flow: EventChannelFlow<T>
) : EventChannelFlow<T> by flow
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment