Last active
March 25, 2020 07:09
-
-
Save kartoffelsup/c3e3b509d2645b41bae4abab0740b42b to your computer and use it in GitHub Desktop.
ArrowFw Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import arrow.Kind | |
import arrow.core.Either | |
import arrow.core.ListK | |
import arrow.core.extensions.list.traverse.sequence | |
import arrow.core.fix | |
import arrow.fx.ForIO | |
import arrow.fx.IO | |
import arrow.fx.Queue | |
import arrow.fx.extensions.fx | |
import arrow.fx.extensions.io.concurrent.concurrent | |
import arrow.fx.extensions.io.dispatchers.dispatchers | |
import arrow.fx.fix | |
import arrow.fx.typeclasses.Concurrent | |
import com.google.common.util.concurrent.ThreadFactoryBuilder | |
import kotlinx.coroutines.time.delay | |
import java.time.Duration | |
import java.util.Collections | |
import java.util.concurrent.Executors | |
import kotlin.reflect.KClass | |
private class Event(val event: Any) | |
interface EventBus<F> { | |
fun <T : Any> send(event: T): Kind<F, Unit> | |
fun <T : Any> listen(type: KClass<T>): Kind<F, T> | |
} | |
class EventBusImpl<F>(private val CF: Concurrent<F>) : EventBus<F> { | |
private val listeners: MutableMap<KClass<*>, MutableList<Kind<F, Queue<F, Event>>>> = | |
Collections.synchronizedMap(mutableMapOf()) | |
override fun <T : Any> send(event: T): Kind<F, Unit> = CF.run { | |
val t: Kind<F, Kind<F, Unit>> = CF.later { | |
val a: MutableList<Kind<F, Queue<F, Event>>>? = listeners[event::class] | |
val b: List<Kind<F, Unit>>? = | |
a?.map { kq: Kind<F, Queue<F, Event>> -> kq.flatMap { queue -> val even1 = Event(event) | |
println("Offer: $even1") | |
queue.offer(even1) } } | |
val c: Kind<F, ListK<Unit>>? = b?.sequence(this)?.map { it.fix() } | |
val d: Kind<F, Unit>? = c?.followedBy(unit()) | |
val e: Kind<F, Unit> = d ?: unit() | |
return@later e | |
} | |
t.flatten() | |
} | |
override fun <T : Any> listen(type: KClass<T>): Kind<F, T> { | |
val queue: Kind<F, Queue<F, Event>> = Queue.bounded(1, CF) | |
listeners.compute(type) { t, u -> | |
val list = u ?: mutableListOf() | |
list.add(queue) | |
list | |
} | |
println("foobar") | |
return CF.run { | |
queue.flatMap { | |
println("take") | |
it.take().map { event: Event -> | |
println("event: $event") | |
event.event as T | |
} | |
} | |
} | |
} | |
} | |
class Request {} | |
class Producer(private val eventHandler: (Request) -> IO<Unit>) { | |
fun produce(): IO<Unit> { | |
return eventHandler(Request()) | |
} | |
} | |
class Consumer { | |
suspend fun onRequest(request: Request): Unit { | |
println("Event consumed: $request") | |
} | |
} | |
suspend fun main() { | |
val eventBus: EventBus<ForIO> = EventBusImpl(IO.concurrent()) | |
val eventHandler: (Request) -> IO<Unit> = { request -> | |
IO.run { eventBus.send(request) }.fix() | |
} | |
val consumer = Consumer() | |
val backgroundExecutor = Executors.newSingleThreadExecutor( | |
ThreadFactoryBuilder() | |
.setNameFormat("bg-job-%d") | |
.build() | |
) | |
val eventDeliveryJob = IO(IO.dispatchers().io()) { | |
deliverEvents(eventBus, consumer) | |
} | |
backgroundExecutor.submit { eventDeliveryJob.unsafeRunSync() } | |
backgroundExecutor.shutdown() | |
kotlinx.coroutines.delay(1000L) | |
val producer = Producer(eventHandler) | |
val attempt: Either<Throwable, Unit> = producer.produce().attempt().suspended() | |
attempt.fold(ifLeft = { System.err.println(it) }, ifRight = { println("Event sent.") }) | |
} | |
suspend fun deliverEvents( | |
eventBus: EventBus<ForIO>, | |
consumer: Consumer | |
) { | |
println("Test") | |
IO.fx { | |
!effect { println("Checking for events...") } | |
val event: Request = !eventBus.listen(Request::class) | |
!effect { println("return event") } | |
!effect { consumer.onRequest(event) } | |
!effect { println("delivered event") } | |
}.attempt() | |
.suspended() | |
.fold(ifLeft = { System.err.println(it) }, ifRight = { println("Event") }) | |
println("argl") | |
delay(Duration.ofSeconds(5L)) | |
deliverEvents(eventBus, consumer) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment