Skip to content

Instantly share code, notes, and snippets.

@parahall
Last active December 29, 2019 16:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save parahall/9baf26c91206b1a3c0893e13c0d1bb3f to your computer and use it in GitHub Desktop.
Save parahall/9baf26c91206b1a3c0893e13c0d1bb3f to your computer and use it in GitHub Desktop.
class CDataStream<T>(
private val taskRunner: ITaskRunner
) : IDataStream<T> {
private val channel = BroadcastChannel<T>(Channel.CONFLATED)
private var value: T? = null
override fun observe(observer: (T) -> Unit): IObservationManager {
val subscription = channel.openSubscription()
this.value?.let(observer)
taskRunner.run {
if (!channel.isClosedForSend) {
subscription.consumeEach {
observer(it)
}
}
}
return CObservationManager(subscription)
}
override fun setValue(value: T) {
this.value = value
taskRunner.run {
if (!channel.isClosedForSend) {
channel.send(value)
}
}
}
override fun close() {
try {
if (!channel.isClosedForSend) {
channel.cancel()
} else {
Timber.e("[CDataStream], close: called but channel already closed")
}
} catch (t: Throwable) {
Timber.e(t, "[CDataStream], close: exception while trying to close channel")
}
}
override fun observeOneTime(observer: (T) -> Unit) {
val subscription = channel.openSubscription()
taskRunner.run {
if(!channel.isClosedForSend) {
val value = subscription.receive()
subscription.cancel()
observer(value)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment