Skip to content

Instantly share code, notes, and snippets.

@ZakTaccardi
Created May 5, 2020 16:59
Show Gist options
  • Save ZakTaccardi/64e7e70fb75ea5cba48a3797d75e6fe4 to your computer and use it in GitHub Desktop.
Save ZakTaccardi/64e7e70fb75ea5cba48a3797d75e6fe4 to your computer and use it in GitHub Desktop.
StateActor
import kotlinx.coroutines.CompletionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.flow.Flow
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
interface StateActor<S, I> : SendChannel<I> {
/**
* Obverse the state exposed by this actor as it changes over time. While a value will always be emitted upon
* subscription, *when* it emits is controlled by [FirstEmission]
*
* To update this state, use:
* - [offer]
* - [send]
*/
val states: Flow<S>
/**
* Thread safe access to the current state ([S]). If there are pending intentions ([I]) when this function is
* called, emissions will be delayed until those intentions are processed.
*
* To update this state, use:
* - [offer]
* - [send]
*/
suspend fun currentState(): S
/**
* Controls how the `Flow<T>` from `[StateActor.states] will emit it's first emission.
*/
enum class FirstEmission {
/**
* The [StateActor.states] will emit its current value immediately upon subscription, even if that current value
* is stale due to pending intentions that have been submitted but not yet processed.
*/
IMMEDIATE,
/**
* When [StateActor.states] is subscribed to, it will wait for any pending intentions to be processed before its
* first value is emitted. At this moment, the first value will be the latest current state with those intentions
* accounted for.
*/
SAFE
}
}
/**
* Return a new state [S] based on the `currentState` and a new `intention` [I].
*/
typealias StateActorReducer<S, I> = suspend CoroutineScope.(currentState: S, intention: I) -> S
/**
* Construct a new [StateActor]
* - overload that provides a [initialState] immediately
*
* @param firstEmission see [FirstEmission]
* @param reduce used to update the state
*
* @see [actor]
* @param context - see [actor]
* @param capacity - see [actor]
* @param start - see [actor]
* @param onCompletion - see [actor]
*/
fun <S, I> CoroutineScope.stateActor(
firstEmission: FirstEmission,
initialState: S,
logger: LoggerForClass = noOpLogger,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
reduce: StateActorReducer<S, I>
): StateActor<S, I> = StateActorImpl(
scope = this,
firstEmission = firstEmission,
initialStateProvider = InitialStateProvider.AlreadyProvided(initialState),
logger = logger.first,
kClass = logger.second,
context = context,
capacity = capacity,
start = start,
onCompletion = onCompletion,
reduce = reduce
)
/**
* Construct a new [StateActor]
* - overload that provides an initial when the actor is first started
*
* @param firstEmission see [FirstEmission]
*
* @param reduce used to update the state
*
* @see [actor]
* @param context - see [actor]
* @param capacity - see [actor]
* @param start - see [actor]
* @param onCompletion - see [actor]
*/
fun <S, I> CoroutineScope.stateActor(
firstEmission: FirstEmission,
provideInitialState: suspend () -> S,
logger: LoggerForClass = noOpLogger,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
reduce: StateActorReducer<S, I>
): StateActor<S, I> = StateActorImpl(
scope = this,
firstEmission = firstEmission,
initialStateProvider = InitialStateProvider.SuspendingProvider(provideInitialState),
logger = logger.first,
kClass = logger.second,
context = context,
capacity = capacity,
start = start,
onCompletion = onCompletion,
reduce = reduce
)
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CompletionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.selects.SelectClause2
import kotlin.coroutines.CoroutineContext
import kotlin.reflect.KClass
/**
* Implementation of [StateActor]
*/
internal class StateActorImpl<S, I>(
scope: CoroutineScope,
firstEmission: StateActor.FirstEmission,
initialStateProvider: InitialStateProvider<S>,
logger: IdentityLogger,
kClass: KClass<*>,
context: CoroutineContext,
capacity: Int,
start: CoroutineStart,
onCompletion: CompletionHandler?,
reduce: StateActorReducer<S, I>
) : StateActor<S, I> {
@ExperimentalCoroutinesApi
private val stateChannel = when (initialStateProvider) {
is InitialStateProvider.AlreadyProvided -> {
val initialState = initialStateProvider.state
logger.asStateLogger(kClass)
.log(initialState)
ConflatedBroadcastChannel(initialStateProvider.state)
}
is InitialStateProvider.SuspendingProvider -> {
// initial state will be provided late when actor is initialized
ConflatedBroadcastChannel()
}
}
private val actor = scope.actor<StateActorIntention<S, I>>(
context = context,
capacity = capacity,
start = start,
onCompletion = onCompletion
) {
val stateLogger = logger.asStateLogger(kClass)
val intentionLogger = logger.asIntentionLogger(kClass)
val getLogger = logger.asLabelLogger(kClass, "Get")
val stateChannelFlow = stateChannel.asFlow()
var currentState: S = when (initialStateProvider) {
is InitialStateProvider.AlreadyProvided -> stateChannel.value
is InitialStateProvider.SuspendingProvider -> {
val initialState = initialStateProvider.provide()
stateChannel.send(initialState)
stateLogger.log(initialState)
initialState
}
}
for (intention in channel) {
when (intention) {
is Actual -> {
intentionLogger.log(intention.intention)
currentState = reduce(
this@actor,
currentState,
intention.intention
)
if (stateChannel.value === currentState) {
// no need to update the channel, new state is the same instance as the old state
} else {
stateChannel.send(currentState)
}
}
is StateActorIntention.Get -> {
getLogger.log(currentState)
intention.deferred.complete(currentState)
}
is StateActorIntention.Observe -> {
intention.deferred.complete(stateChannelFlow)
currentState
}
}
if (intention is StateActorIntention.Observe) {
intention.deferred.complete(stateChannelFlow)
}
}
}
override val states: Flow<S> = when (firstEmission) {
IMMEDIATE -> stateChannel.asFlow()
SAFE -> flow {
val intention = StateActorIntention.Observe<S, I>()
actor.send(intention)
emitAll(intention.deferred.await())
}
}
override suspend fun currentState(): S {
val intention = StateActorIntention.Get<S, I>()
actor.send(intention)
return intention.deferred.await()
}
private val _onSend: SelectClause2<StateActorIntention<S, I>, SendChannel<StateActorIntention<S, I>>> = actor.onSend
@ExperimentalCoroutinesApi
override val isClosedForSend: Boolean
get() = actor.isClosedForSend
override fun close(cause: Throwable?): Boolean = actor.close(cause)
@ExperimentalCoroutinesApi
override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) = actor.invokeOnClose(handler)
override fun offer(element: I): Boolean = actor.offer(StateActorIntention.Actual(element))
override suspend fun send(element: I) = actor.send(StateActorIntention.Actual(element))
@ExperimentalCoroutinesApi
override val isFull: Boolean get() = throw NotImplementedError("Deprecated")
}
internal sealed class InitialStateProvider<S> {
data class AlreadyProvided<S>(val state: S) : InitialStateProvider<S>()
data class SuspendingProvider<S>(val provide: suspend () -> S) : InitialStateProvider<S>()
}
/**
* An internal wrapper around [Actual] that allows use to support [StateActor] features
*/
private sealed class StateActorIntention<S, I> {
data class Actual<S, I>(val intention: I) : StateActorIntention<S, I>()
/**
* Supports [StateActor.currentState]
*/
data class Get<S, I>(val deferred: CompletableDeferred<S> = CompletableDeferred()) : StateActorIntention<S, I>()
/**
* Supports [StateActor.states] and [StateActor.FlowFirstEmissionBehavior]
*/
data class Observe<S, I>(val deferred: CompletableDeferred<Flow<S>> = CompletableDeferred()) : StateActorIntention<S, I>()
}
internal val noOpLogger: LoggerForClass = NoOpIdentityLogger().forClass(Nothing::class)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment