Skip to content

Instantly share code, notes, and snippets.

@kartoffelsup
Last active March 25, 2020 07:09
Show Gist options
  • Save kartoffelsup/c3e3b509d2645b41bae4abab0740b42b to your computer and use it in GitHub Desktop.
Save kartoffelsup/c3e3b509d2645b41bae4abab0740b42b to your computer and use it in GitHub Desktop.
ArrowFw Queue
import arrow.Kind
import arrow.core.Either
import arrow.core.ListK
import arrow.core.extensions.list.traverse.sequence
import arrow.core.fix
import arrow.fx.ForIO
import arrow.fx.IO
import arrow.fx.Queue
import arrow.fx.extensions.fx
import arrow.fx.extensions.io.concurrent.concurrent
import arrow.fx.extensions.io.dispatchers.dispatchers
import arrow.fx.fix
import arrow.fx.typeclasses.Concurrent
import com.google.common.util.concurrent.ThreadFactoryBuilder
import kotlinx.coroutines.time.delay
import java.time.Duration
import java.util.Collections
import java.util.concurrent.Executors
import kotlin.reflect.KClass
private class Event(val event: Any)
interface EventBus<F> {
fun <T : Any> send(event: T): Kind<F, Unit>
fun <T : Any> listen(type: KClass<T>): Kind<F, T>
}
class EventBusImpl<F>(private val CF: Concurrent<F>) : EventBus<F> {
private val listeners: MutableMap<KClass<*>, MutableList<Kind<F, Queue<F, Event>>>> =
Collections.synchronizedMap(mutableMapOf())
override fun <T : Any> send(event: T): Kind<F, Unit> = CF.run {
val t: Kind<F, Kind<F, Unit>> = CF.later {
val a: MutableList<Kind<F, Queue<F, Event>>>? = listeners[event::class]
val b: List<Kind<F, Unit>>? =
a?.map { kq: Kind<F, Queue<F, Event>> -> kq.flatMap { queue -> val even1 = Event(event)
println("Offer: $even1")
queue.offer(even1) } }
val c: Kind<F, ListK<Unit>>? = b?.sequence(this)?.map { it.fix() }
val d: Kind<F, Unit>? = c?.followedBy(unit())
val e: Kind<F, Unit> = d ?: unit()
return@later e
}
t.flatten()
}
override fun <T : Any> listen(type: KClass<T>): Kind<F, T> {
val queue: Kind<F, Queue<F, Event>> = Queue.bounded(1, CF)
listeners.compute(type) { t, u ->
val list = u ?: mutableListOf()
list.add(queue)
list
}
println("foobar")
return CF.run {
queue.flatMap {
println("take")
it.take().map { event: Event ->
println("event: $event")
event.event as T
}
}
}
}
}
class Request {}
class Producer(private val eventHandler: (Request) -> IO<Unit>) {
fun produce(): IO<Unit> {
return eventHandler(Request())
}
}
class Consumer {
suspend fun onRequest(request: Request): Unit {
println("Event consumed: $request")
}
}
suspend fun main() {
val eventBus: EventBus<ForIO> = EventBusImpl(IO.concurrent())
val eventHandler: (Request) -> IO<Unit> = { request ->
IO.run { eventBus.send(request) }.fix()
}
val consumer = Consumer()
val backgroundExecutor = Executors.newSingleThreadExecutor(
ThreadFactoryBuilder()
.setNameFormat("bg-job-%d")
.build()
)
val eventDeliveryJob = IO(IO.dispatchers().io()) {
deliverEvents(eventBus, consumer)
}
backgroundExecutor.submit { eventDeliveryJob.unsafeRunSync() }
backgroundExecutor.shutdown()
kotlinx.coroutines.delay(1000L)
val producer = Producer(eventHandler)
val attempt: Either<Throwable, Unit> = producer.produce().attempt().suspended()
attempt.fold(ifLeft = { System.err.println(it) }, ifRight = { println("Event sent.") })
}
suspend fun deliverEvents(
eventBus: EventBus<ForIO>,
consumer: Consumer
) {
println("Test")
IO.fx {
!effect { println("Checking for events...") }
val event: Request = !eventBus.listen(Request::class)
!effect { println("return event") }
!effect { consumer.onRequest(event) }
!effect { println("delivered event") }
}.attempt()
.suspended()
.fold(ifLeft = { System.err.println(it) }, ifRight = { println("Event") })
println("argl")
delay(Duration.ofSeconds(5L))
deliverEvents(eventBus, consumer)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment