Skip to content

Instantly share code, notes, and snippets.

@gaerfield
Last active May 19, 2021 09:49
Show Gist options
  • Save gaerfield/ac5e89b1348c5927d6f6ef61225f8760 to your computer and use it in GitHub Desktop.
Save gaerfield/ac5e89b1348c5927d6f6ef61225f8760 to your computer and use it in GitHub Desktop.
Simple Kotlin-EventBus including Unit-Tests
package infrastructure
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.*
internal class EventBus {
// Initialize the bus with an empty Event, so we can drop the first element everytime on a new subscription
// This is neccesary, because on subscription usually the current element is send (although it was added
// before subscription)
private val bus = ConflatedBroadcastChannel<Any>(object {})
/**
* Send an arbitrary event.
*/
suspend fun send(o : Any)= bus.send(o)
/**
* Listening to a flow of events.
*
* Usage may look like:
* ```
* suspend fun startListening() = coroutineScope {
* launch {
* eventBus.on<MyEvent>().collect { event -> actOnMyEvent(event) }
* }
* }
*
* fun actOnMyEvent(event: MyEvent) { /* ... */ }
* ```
*/
inline fun <reified T : Any> on() =
bus.asFlow().drop(1).filter { it is T }.map { it as T }
/**
* Register an Event-Handler. The Handler is called asynchronously by launching a new coroutine.
*
* Keep in mind, per default this handler is operating on the whole application lifetime and are not cancelled
* prematurely, because it launches a top-level-coroutine with [GlobalScope].
*
* When EventHandling happens with dynamic registered handlers, usage may look like:
*
* ```
* class Component(eventBus : EventBus) {
* private var listener : Job? = null
* suspend fun startListening() = coroutineScope {
* listener = eventBus.on<FrontDesk.Ordered>(this) { event -> actOnMyEvent(event) }
* }
* fun stopListening() { listener?.cancel() }
*
* fun actOnMyEvent(event: FrontDesk.Ordered) { /* ... */}
* }
* ```
*
* Registering a Handler for whole application lifetime, may look like:
*
* ```
* class Component(eventBus : EventBus) {
* init {
* eventBus.on<MyEvent> { event -> actOnMyEvent(event) }
* }
*
* fun actOnMyEvent(event: MyEvent) { /* ... */ }
* }
* ```
*/
inline fun <reified T : Any> on(
scope: CoroutineScope = GlobalScope,
crossinline action: suspend (value: T) -> Unit) = scope.launch {
on<T>().collect(action)
}
}
package infrastructure
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runBlockingTest
import java.util.*
import kotlin.test.Test
import kotlin.test.assertEquals
internal suspend inline fun <reified T : Any> EventBus.recordEvents(scope: CoroutineScope) =
EventCollector(this.on<T>()).launch(scope)
internal class EventCollector<T>(
private val flow: Flow<T>
) {
private val values = mutableListOf<T>()
private lateinit var job: Job
suspend fun launch(scope: CoroutineScope): EventCollector<T> {
job = scope.launch { flow.collect { values.add(it) } }
return this
}
fun finish(): List<T> {
job.cancel()
return values.toList()
}
}
internal class EventBusTest {
data class TestEvent(val content: String = UUID.randomUUID().toString())
data class SecondTestEvent(val content: String = UUID.randomUUID().toString())
@Test
fun `subscribing to TestEvent and receiving one Event`() = runBlockingTest {
val eventBus = EventBus()
val observer = eventBus.recordEvents<TestEvent>(this)
val expected = TestEvent()
eventBus.send(expected)
val actual = observer.finish()
assertEquals(listOf(expected), actual)
}
@Test
fun `dont receive other type of Events`() = runBlockingTest {
val eventBus = EventBus()
val observer = eventBus.recordEvents<TestEvent>(this)
val notExpected = SecondTestEvent()
eventBus.send(notExpected)
assertEquals(listOf(), observer.finish())
}
@Test
fun `receive events in the correct order`() = runBlockingTest {
val eventBus = EventBus()
val observer = eventBus.recordEvents<TestEvent>(this)
val expected = listOf(TestEvent(), TestEvent())
expected.forEach { eventBus.send(it) }
assertEquals(expected, observer.finish())
}
@Test
fun `dont receive Events before subscription`() = runBlockingTest {
val eventBus = EventBus()
eventBus.send(TestEvent())
val observer = eventBus.recordEvents<TestEvent>(this)
val expected = TestEvent()
eventBus.send(expected)
assertEquals(listOf(expected), observer.finish())
}
@Test
fun `dont receive same Events twice`() = runBlockingTest {
val eventBus = EventBus()
val observer1 = eventBus.recordEvents<TestEvent>(this)
val expected = TestEvent()
eventBus.send(expected)
assertEquals(listOf(expected), observer1.finish())
val observer2 = eventBus.recordEvents<TestEvent>(this)
assertEquals(listOf(), observer2.finish())
}
@Test
fun `same Event can be received by different listeners`() = runBlockingTest {
val eventBus = EventBus()
val observer1 = eventBus.recordEvents<TestEvent>(this)
val observer2 = eventBus.recordEvents<TestEvent>(this)
val expected = TestEvent()
eventBus.send(expected)
assertEquals(listOf(expected), observer1.finish())
assertEquals(listOf(expected), observer2.finish())
}
}
@gaerfield
Copy link
Author

Inspired by takahirom with modifications on:

  • on subscription no event is send, that have occured before subscription
  • uses (experimental) flow-api

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment