Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save soywiz/03bf0e11a591e9739ac73a4290d0c739 to your computer and use it in GitHub Desktop.
Save soywiz/03bf0e11a591e9739ac73a4290d0c739 to your computer and use it in GitHub Desktop.
// Depends on code defined here (related to EventLoop and promise implementation):
// https://gist.github.com/soywiz/a2a26b9375f1f85048ab3ce7ffdb5501
// More examples here: httpshttps://github.com/kotlin-es/coroutine-examples/blob/master/src/example-async-generator.kt
import java.util.*
fun main(args: Array<String>) = EventLoop.main {
// Asynchronous Producer
fun readUsers() = generateAsync<User> {
// This could read data results from disk or a socket
for (n in 0 until 4) {
waitAsync(0.3.seconds).await()
emit(User(name = "test$n", age = n * 5))
}
}
async<Unit> {
// Consumer
readUsers().eachAsync { user ->
println(user)
}.await()
println("----")
// Consumer (eachAsync+await alias just inside async block)
readUsers().each {
println(it)
}
println("----")
val sumPromise = readUsers().mapAsync { it.age }.sumAsync()
val sumGreatOrEqualThan10Promise = readUsers().filterAsync { it.age >= 10 }.mapAsync { it.age }.sumAsync()
println("Parallelized:")
println("All ages summed: " + sumPromise.await())
println("All ages (greater than 10) summed: " + sumGreatOrEqualThan10Promise.await())
}
}
data class User(val name:String, val age:Int)
fun <T> generateAsync(coroutine routine: AsyncStreamController<T>.() -> Continuation<Unit>): AsyncStream<T> {
val controller = AsyncStreamController<T>()
val c = routine(controller)
c.resume(Unit)
return controller.stream
}
class AsyncStreamController<T> {
private val emitter = AsyncStream.Emitter<T>()
val stream = emitter.stream
fun emit(value: T) {
emitter.emit(value)
}
suspend fun <T> Promise<T>.await(c: Continuation<T>) {
this.then(
resolved = {
c.resume(it)
},
rejected = {
c.resumeWithException(it)
}
)
}
suspend fun AsyncStream<T>.each(handler: (T) -> Unit, c: Continuation<Unit>) {
this.eachAsync(handler).await(c)
}
operator fun handleResult(v: Unit, c: Continuation<Nothing>) {
emitter.close()
}
operator fun handleException(t: Throwable, c: Continuation<Nothing>) {
emitter.deferred.reject(t)
}
}
class AsyncStream<T> {
typealias Handler = (T) -> Unit
private val deferred = Promise.Deferred<Unit>()
private val handlers = arrayListOf<Handler>()
class Emitter<T> {
val stream = AsyncStream<T>()
val deferred = stream.deferred
val buffer = LinkedList<T>()
fun emit(value: T) {
buffer += value
if (stream.handlers.isNotEmpty()) {
while (buffer.isNotEmpty()) {
val item = buffer.removeFirst()
for (handler in stream.handlers) handler(item)
}
}
}
fun close() {
deferred.resolve(Unit)
}
}
fun eachAsync(handler: Handler) = listenAsync(handler)
fun listenAsync(handler: Handler): Promise<Unit> {
handlers += handler
return deferred.promise
}
fun <T2> mapAsync(map: (T) -> T2): AsyncStream<T2> {
val emitter = AsyncStream.Emitter<T2>()
this.listenAsync {
emitter.emit(map(it))
}.then {
emitter.close()
}
return emitter.stream
}
fun filterAsync(filter: (T) -> Boolean): AsyncStream<T> {
val emitter = AsyncStream.Emitter<T>()
this.listenAsync {
if (filter(it)) emitter.emit(it)
}.then {
emitter.close()
}
return emitter.stream
}
fun <R> foldAsync(initial: R, fold: (R, T) -> R): Promise<R> {
val out = Promise.Deferred<R>()
var result = initial
this.listenAsync {
result = fold(result, it)
}.then {
out.resolve(result)
}
return out.promise
}
}
//fun <T : Number> AsyncStream<T>.sumAsync(): Promise<T> {
fun AsyncStream<Int>.sumAsync(): Promise<Int> {
return foldAsync(0) { a, b -> a + b }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment