Last active
September 14, 2023 07:49
-
-
Save attilakruchio/05d24f0eb9ed004f9539750a49b18759 to your computer and use it in GitHub Desktop.
EventChannelFlow implementation, which requires that the emission and collection happens on Main.immediate dispatcher, preventing any event losses.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Created by Attila Kruchió on 2023. 09. 13. 22:24 | |
* attila.kruchio.dev@gmail.com | |
* | |
* Copyright (c) 2023. | |
* All rights reserved. | |
*/ | |
package com.attila.kruchio.android.flow | |
import android.util.Log | |
import kotlin.coroutines.ContinuationInterceptor | |
import kotlin.coroutines.CoroutineContext | |
import kotlinx.coroutines.Dispatchers | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.currentCoroutineContext | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.FlowCollector | |
import kotlinx.coroutines.flow.receiveAsFlow | |
import kotlinx.coroutines.withContext | |
/** | |
* Represents a unidirectional flow of data tailored for one-shot event scenarios. It ensures that a specific event is observed exactly once. | |
* | |
* This flow guarantees that all emitted events will be delivered to the collector, preventing any loss. | |
* One-shot events can only be guaranteed to arrive to the collector, if we are dispatching and collecting the | |
* events on [Dispatchers.Main.immediate] | |
* | |
* Important Note: This flow is designed to work reliably with a single collector. If multiple collectors are used with the same [EventChannelFlow], | |
* only the first collector will receive the emitted event. | |
*/ | |
public interface EventChannelFlow<T> : Flow<T> | |
/** | |
* Represents a mutable version of [EventChannelFlow] that allows emitting a value. | |
*/ | |
public interface MutableEventChannelFlow<T> : EventChannelFlow<T>, FlowCollector<T> { | |
public override suspend fun emit(value: T) | |
} | |
/** | |
* Specifies the context policy for handling events in the [EventChannelFlow]. | |
* Different policies determine how events are processed with respect to coroutine contexts. | |
*/ | |
public sealed interface ContextPolicy { | |
/** | |
* Switches the [CoroutineContext] automatically to [Dispatchers.Main.immediate] if the events are not collected or sent | |
* on this [CoroutineContext]. | |
* | |
* Important note: This will break the context hierarchy, since it does not matter, if you want to | |
* collect the flow on for eg.: [Dispatchers.IO], under-the-hood it'll switch context to [Dispatchers.Main.immediate]. | |
* | |
* Basically equals with the following code: | |
* ```kotlin | |
* withContext(Dispatchers.IO) { | |
* withContext(Dispatchers.Main.immediate) { | |
* // collecting or sending events will happen here | |
* } | |
* } | |
* ``` | |
*/ | |
object SwitchAutomatically : ContextPolicy | |
/** | |
* Requires the current coroutine to be executed on [Dispatchers.Main.immediate]. | |
* If that's not the case, an [IllegalStateException] will be thrown, signing that you are collecting | |
* or receiving events on the wrong [CoroutineContext] | |
*/ | |
object Require : ContextPolicy | |
/** | |
* Does not enforce any specific context policy. | |
*/ | |
object None : ContextPolicy | |
/** | |
* Uses a specific coroutine context for event handling. | |
* @property coroutineContext The coroutine context to be used. Must not be [Dispatchers.Main.immediate]. | |
*/ | |
data class WithContext(val coroutineContext: CoroutineContext) : ContextPolicy { | |
init { | |
require(coroutineContext !== Dispatchers.Main.immediate) { | |
"Main.immediate is used by the Default policy, no need to specify the policy as WithContext" | |
} | |
} | |
} | |
companion object { | |
/** | |
* Provides a default context policy, which is set to [SwitchAutomatically]. | |
*/ | |
val Default: ContextPolicy | |
get() = SwitchAutomatically | |
} | |
} | |
/** | |
* Creates a new instance of [MutableEventChannelFlow]. This function allows you to create a mutable event flow, | |
* which can be used to emit events. | |
* | |
* @param contextPolicy The context policy for event handling. Default is Require. | |
* @return A new instance of [MutableEventChannelFlow]. | |
*/ | |
public fun <T> MutableEventChannelFlow( | |
contextPolicy: ContextPolicy = ContextPolicy.Default | |
): MutableEventChannelFlow<T> { | |
return EventChannelFlowImpl(contextPolicy) | |
} | |
/** | |
* Implementation of [MutableEventChannelFlow] interface. This class provides the underlying implementation for emitting | |
* and collecting values in an underlying [Channel] implementation based on the given [ContextPolicy]. | |
*/ | |
private class EventChannelFlowImpl<T>( | |
private val contextPolicy: ContextPolicy | |
) : MutableEventChannelFlow<T> { | |
private val internalChannel = Channel<T>(Channel.UNLIMITED) | |
private val internalChannelFlow = internalChannel.receiveAsFlow() | |
override suspend fun collect(collector: FlowCollector<T>) { | |
withPolicy { internalChannelFlow.collect(collector) } | |
} | |
override suspend fun emit(value: T) { | |
withPolicy { internalChannel.send(value) } | |
} | |
private suspend fun withPolicy(block: suspend () -> Unit) { | |
return when (val policy = contextPolicy) { | |
is ContextPolicy.None -> { | |
if (!isOnMainImmediate()) { | |
Log.w( | |
TAG, | |
"ContextPolicy is set to None and not running on Main.immediate." + " This means, that you could lose events" | |
) | |
} | |
block() | |
} | |
is ContextPolicy.Require -> { | |
requireCurrentContextIsMainImmediate() | |
block() | |
} | |
is ContextPolicy.SwitchAutomatically -> { | |
// FAST PATH -> Not switching context, if we are already on Main.immediate | |
if (isOnMainImmediate()) { | |
block() | |
} else { | |
withContext(Dispatchers.Main.immediate) { block() } | |
} | |
} | |
is ContextPolicy.WithContext -> { | |
withContext(policy.coroutineContext) { block() } | |
} | |
} | |
} | |
private suspend fun isOnMainImmediate(): Boolean { | |
val currentContext = currentCoroutineContext()[ContinuationInterceptor] | |
return currentContext === Dispatchers.Main.immediate | |
} | |
private suspend fun requireCurrentContextIsMainImmediate() { | |
check(isOnMainImmediate()) { | |
val currentContext = currentCoroutineContext()[ContinuationInterceptor] | |
"Current coroutineContext is not Dispatchers.Main.immediate, it is $currentContext" | |
} | |
} | |
companion object { | |
private const val TAG = "EventChannelFlow" | |
} | |
} | |
/** | |
* Converts a [MutableEventChannelFlow] to an immutable [EventChannelFlow]. | |
* | |
* @return An immutable [EventChannelFlow]. | |
*/ | |
public fun <T> MutableEventChannelFlow<T>.asEventChannelFlow(): EventChannelFlow<T> { | |
return ReadonlyEventChannelFlow(this) | |
} | |
private class ReadonlyEventChannelFlow<T>( | |
flow: EventChannelFlow<T> | |
) : EventChannelFlow<T> by flow |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment