Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Created February 16, 2021 19:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fluidsonic/4bf5aca6d84117f371d84894ccb976f4 to your computer and use it in GitHub Desktop.
Save fluidsonic/4bf5aca6d84117f371d84894ccb976f4 to your computer and use it in GitHub Desktop.
import java.util.concurrent.*
import kotlin.reflect.*
import kotlinx.coroutines.*
class SynchronousEventEmitter(
private val onError: (error: Throwable, event: Event) -> Unit,
) : EventEmitter, EventSource {
private val subscriptions = CopyOnWriteArraySet<Subscription<*>>()
override suspend fun emit(event: Event) {
subscriptions
.mapNotNull { it.dispatchAsync(event) }
.forEach { it.await() }
}
override fun <TEvent : Event> subscribeIn(scope: CoroutineScope, event: KClass<out TEvent>, action: suspend (TEvent) -> Unit): EventSubscription {
val subscription = Subscription(action = action, eventClass = event, scope = scope)
subscriptions += subscription
scope.coroutineContext.job.invokeOnCompletion {
subscription.unsubscribe()
}
return subscription
}
private inner class Subscription<in TEvent : Event>(
private val action: suspend (TEvent) -> Unit,
private val eventClass: KClass<out TEvent>,
private val scope: CoroutineScope,
) : EventSubscription {
@Suppress("NAME_SHADOWING")
fun dispatchAsync(event: Event): Deferred<Unit>? {
val event = eventClass.safeCast(event) ?: return null
return scope.async {
try {
action(event)
}
catch (error: Throwable) {
onError(error, event)
}
}
}
override fun unsubscribe() {
subscriptions -= this
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment