Skip to content

Instantly share code, notes, and snippets.

@L-Briand
Last active May 4, 2023 13:33
Show Gist options
  • Save L-Briand/d6795aa6960aa825a648076b7c08a413 to your computer and use it in GitHub Desktop.
Save L-Briand/d6795aa6960aa825a648076b7c08a413 to your computer and use it in GitHub Desktop.
Observable pattern with kotlinx coroutine
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.lang.ref.WeakReference
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
typealias Observer<T> = suspend (old: T, new: T) -> Unit
fun <T> observe(block: suspend (T) -> Unit): Observer<T> = { _, new -> block(new) }
fun <T> observeUpdate(block: suspend (T, T) -> Unit): Observer<T> = block
/**
* A coroutine safe observable.
*
* To manipulate the underlying value uses [update], [get] or [getUnsafe].
* You can also [add], [remove] or [clear] [Observer]s.
*
*/
class Observable<T> private constructor(defaultValue: T) {
companion object {
fun <T> make(
defaultValue: T,
notificationDispatcher: CoroutineContext = Dispatchers.IO,
): Observable<T> {
val observable = Observable(defaultValue)
val weak = WeakReference(observable)
notificationTask(weak, notificationDispatcher)
return observable
}
/**
* Launch a coroutine calling observers when a new value is set.
* It do not reference the observable. This way, when the observable is garbage collected,
* the notification tasks ends.
*/
private fun <T> notificationTask(
weak: WeakReference<Observable<T>>,
dispatcher: CoroutineContext,
) {
CoroutineScope(dispatcher).launch {
while (true) {
// Try to get new value from channel
val new = try {
weak.get()?.valueChannel?.receive() ?: break
} catch (e: ClosedReceiveChannelException) {
break
}
if (!isActive) break
// If a value is found, notify attached observers
weak.get()?.also { obs ->
val old = obs._value
val observers = obs.observersMutex.withLock { obs.observers.toTypedArray() }
for (observer in observers)
runCatching { observer(old, new) }.onFailure { it.printStackTrace() }
obs._value = new
} ?: break
}
}
}
}
/** Hold values needed to be notified. */
private var valueChannel: Channel<T> = Channel(Channel.BUFFERED)
private var _value: T = defaultValue
/** Hold the latest set value. */
var value: T
get() = _value
set(value) = update(value) ?: Unit
fun update(value: T) = valueChannel.trySend(value).getOrNull()
/** List of all listeners that need to be notified when a new value is set. */
private val observers = mutableListOf<Observer<T>>()
private val observersMutex = Mutex()
/** Blocks until the the observer is added */
fun addBlocking(observer: Observer<T>, notifyCurrentValue: Boolean = false) =
runBlocking { add(observer, notifyCurrentValue) }
/** Add an observer */
suspend fun add(observer: Observer<T>, notifyCurrentValue: Boolean = false) {
observersMutex.withLock { observers.add(observer) }
if (notifyCurrentValue) observer.invoke(value, value)
}
/** Blocks until the the observer is removed */
fun removeBlocking(observer: Observer<T>) = runBlocking { remove(observer) }
/** Remove an observer */
suspend fun remove(observer: Observer<T>) = observersMutex.withLock { observers.remove(observer) }
/** Remove all observers */
suspend fun clear() = observersMutex.withLock { observers.clear() }
private val isClose = AtomicBoolean(false)
fun close() {
isClose.set(true)
valueChannel.close()
}
}
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
/**
* Simplify the process of listening serially with a listener.
*
* Generally when you want to listen to events in coroutines you have to :
* - Add a listener to an object.
* - This listener publish events to a channel.
* - You launch a coroutine and listen to the channel events.
*
*/
interface ChannelListener<T, L> {
/** The channel where the [listener] publish events */
val channel: Channel<T>
/** The listener that will receive events. */
val listener: L
/**
* Some kind of way to close the events feed.
*
* By default it only close the channel but implementations
* can use it to remove listener for event producer as well.
*/
fun close(): Boolean = channel.close()
companion object {
/**
* Generic way to create a ChannelListener.
*
* Usage :
* ```kotlin
* interface MyListener {
* fun callback1(value: String)
* fun end()
* }
*
* val cl = ChannelListener.makeGeneric<String?, MyListener> { channel ->
* object : MyListener {
* override fun callback1(value: String) {
* channel.trySend(value)
* }
* override fun end() {
* channel.trySend(null)
* }
* }
* }
*
* addListener(cl.listener)
*
* runBlocking {
* while (true) {
* when(val newEvent = cl.channel.receive()) {
* null -> break
* else -> println(newEvent)
* }
* }
* }
*
* removeListener(cl.listener)
* ```
*/
inline fun <reified T, reified L> makeGeneric(
capacity: Int = Channel.BUFFERED,
crossinline block: (Channel<T>) -> L
): ChannelListener<T, L> = object : ChannelListener<T, L> {
override val channel: Channel<T> = Channel(capacity)
override val listener: L = block(channel)
}
}
}
/**
* Create a channel listener from an Observable.
* Each time a notification occurs, the channel is feed with the value.
*/
suspend inline fun <reified T> Observable<T>.makeChannelListener(
notifyCurrentValue: Boolean = false,
capacity: Int = Channel.BUFFERED,
): ChannelListener<T, Observer<T>> {
val result = object : ChannelListener<T, Observer<T>> {
override val channel: Channel<T> = Channel(capacity)
override val listener: Observer<T> = observe { channel.trySend(it) }
override fun close(): Boolean {
val r1 = runBlocking { remove(listener) }
val r2 = channel.close()
return r1 && r2
}
}
add(result.listener, notifyCurrentValue)
return result
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment