Skip to content

Instantly share code, notes, and snippets.

@nomisRev
Created March 27, 2020 10:22
Show Gist options
  • Save nomisRev/f5aff9ac61a466d7de140480290311a8 to your computer and use it in GitHub Desktop.
Save nomisRev/f5aff9ac61a466d7de140480290311a8 to your computer and use it in GitHub Desktop.
EventBus, question on KotlinLang Slack
import arrow.Kind
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.IOOf
import arrow.fx.Queue
import arrow.fx.extensions.fx
import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.dispatchers.dispatchers
import arrow.fx.fix
import arrow.fx.typeclasses.Concurrent
import arrow.fx.typeclasses.milliseconds
import arrow.fx.typeclasses.seconds
import kotlin.reflect.KClass
private class Event(val event: Any)
interface EventBus<F> {
fun <T : Any> send(event: T): Kind<F, Unit>
fun <T : Any> listen(type: KClass<T>): Kind<F, T>
}
fun <F> EventBus(CF: Concurrent<F>): Kind<F, EventBus<F>> = CF.run {
Ref(emptyMap<KClass<*>, List<Queue<F, Event>>>()).map { ref ->
object : EventBus<F> {
override fun <T : Any> send(event: T): Kind<F, Unit> =
ref.get().map { it[event::class] }.flatMap { qs ->
qs?.map { queue ->
queue.offer(Event(event))
}?.parSequence()?.void() ?: unit()
}
override fun <T : Any> listen(type: KClass<T>): Kind<F, T> =
Queue.bounded<F, Event>(1, CF).flatMap { q ->
ref.update { types -> types.update(type) { it + q } }
.followedBy(q.take().map { event -> event.event as T })
}
}
}
}
private fun <F> Map<KClass<*>, List<Queue<F, Event>>>.update(
key: KClass<*>,
update: (List<Queue<F, Event>>) -> List<Queue<F, Event>>
): Map<KClass<*>, List<Queue<F, Event>>> {
val original = get(key) ?: emptyList()
val new = update(original)
return this + Pair(key, new)
}
class Request
class Producer(private val eventHandler: (Request) -> IOOf<Unit>) {
fun produce(): IO<Unit> = eventHandler(Request()).fix()
}
class Consumer {
suspend fun onRequest(request: Request): Unit =
println("Event consumed: $request.")
}
suspend fun main() = IO.fx {
val eventBus = !EventBus(IO.concurrent())
val consumer = Consumer()
!deliverEvents(eventBus, consumer).fork(IO.dispatchers().io()) // Fire & forget
!IO.sleep(1000.milliseconds)
!Producer { request ->
eventBus.send(request)
}.produce()
!effect { println("Event sent.") }
}.suspended()
fun deliverEvents(
eventBus: EventBus<ForIO>,
consumer: Consumer
): IO<Unit> = IO.fx {
val event: Request = !eventBus.listen(Request::class)
val e = !effect { consumer.onRequest(event) }
!effect { println("e") }
!IO.sleep(5.seconds)
!deliverEvents(eventBus, consumer)
}
@nomisRev
Copy link
Author

Refactor to Queue.unbounded instead of List<Queue<F, Event>>.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment