Skip to content

Instantly share code, notes, and snippets.

@agiuliano
Created January 23, 2021 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save agiuliano/0e47d7c385995e792c49ba7d981bd233 to your computer and use it in GitHub Desktop.
Save agiuliano/0e47d7c385995e792c49ba7d981bd233 to your computer and use it in GitHub Desktop.
package kotlin_playground
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.sendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
import kotlin.reflect.KClass
class EventBus {
private val publisher: SendChannel<Event>
private val receiver: Flow<Event>
init {
val channel = BroadcastChannel<Event>(Channel.BUFFERED)
publisher = channel
receiver = channel.asFlow()
}
inline fun <reified E : Event> subscribe(): Flow<E> = subscribe(E::class)
fun <E : Event> subscribe(eventClass: KClass<E>): Flow<E> {
@Suppress("UNCHECKED_CAST")
return receiver.filter { eventClass.isInstance(it) } as Flow<E>
}
fun notify(event: Event) {
// This can cause back-pressure due to downstream needing time to process existing events
publisher.sendBlocking(event)
}
}
interface Event
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment