Skip to content

Instantly share code, notes, and snippets.

@parahall

parahall/CDataStream.kt

Last active Dec 29, 2019
Embed
What would you like to do?
class CDataStream<T>(
private val taskRunner: ITaskRunner
) : IDataStream<T> {
private val channel = BroadcastChannel<T>(Channel.CONFLATED)
private var value: T? = null
override fun observe(observer: (T) -> Unit): IObservationManager {
val subscription = channel.openSubscription()
this.value?.let(observer)
taskRunner.run {
if (!channel.isClosedForSend) {
subscription.consumeEach {
observer(it)
}
}
}
return CObservationManager(subscription)
}
override fun setValue(value: T) {
this.value = value
taskRunner.run {
if (!channel.isClosedForSend) {
channel.send(value)
}
}
}
override fun close() {
try {
if (!channel.isClosedForSend) {
channel.cancel()
} else {
Timber.e("[CDataStream], close: called but channel already closed")
}
} catch (t: Throwable) {
Timber.e(t, "[CDataStream], close: exception while trying to close channel")
}
}
override fun observeOneTime(observer: (T) -> Unit) {
val subscription = channel.openSubscription()
taskRunner.run {
if(!channel.isClosedForSend) {
val value = subscription.receive()
subscription.cancel()
observer(value)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.