Skip to content

Instantly share code, notes, and snippets.

@mgodave
Created November 14, 2013 04:35
Show Gist options
  • Save mgodave/7461413 to your computer and use it in GitHub Desktop.
Save mgodave/7461413 to your computer and use it in GitHub Desktop.
Akka Futures API in Kotlin (for fun)
import com.google.common.util.concurrent.ListeningExecutorService
import com.google.common.util.concurrent.MoreExecutors
import java.util.concurrent.Executors
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.FutureCallback
import com.google.common.util.concurrent.SettableFuture
import java.util.ArrayList
object KFutures {
public val executor: ListeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()) as ListeningExecutorService;
}
fun <I, O> ListenableFuture<I>.map(f: (x: I) -> O): ListenableFuture<O> {
return Futures.transform(this, f as ((I?) -> O?)) as ListenableFuture<O>;
}
fun <I, O> ListenableFuture<I>.flatMap(f: (x: I) -> ListenableFuture<O>): ListenableFuture<O> {
return Futures.transform(this, f as ((I?) -> ListenableFuture<O>?)) as ListenableFuture<O>;
}
fun <T> future(executor: ListeningExecutorService = KFutures.executor, f: () -> T): ListenableFuture<T> {
return executor.submit(callable(f))
}
fun <T> promise(): SettableFuture<T> {
return SettableFuture.create<T>() as SettableFuture<T>
}
fun <T> promise(v: T): ListenableFuture<T> {
return Futures.immediateFuture<T>(v) as ListenableFuture<T>
}
fun <T> promise(t: Throwable): ListenableFuture<T> {
return Futures.immediateFailedFuture<T>(t) as ListenableFuture<T>
}
fun <I> ListenableFuture<I>.onSuccess(f: (x: I?) -> Unit): Unit {
Futures.addCallback(this, object : FutureCallback<I> {
override fun onSuccess(result: I?) {
f(result)
}
override fun onFailure(t: Throwable?) {
}
})
}
fun <I> ListenableFuture<I>.onFailure(f: (x: Throwable) -> Unit): Unit {
Futures.addCallback(this, object : FutureCallback<I> {
override fun onFailure(t: Throwable?) {
f(t as Throwable)
}
override fun onSuccess(result: I?) {
}
})
}
fun <I> ListenableFuture<I>.andThen(f: (x: Any?) -> Unit): ListenableFuture<I> {
val p = promise<I>()
this.addListener(runnable {
try {
val v = this.get()
f(v)
p.set(v)
} catch (t: Throwable) {
f(t)
p.setException(t)
}
}, MoreExecutors.sameThreadExecutor())
return p
}
fun <T> sequence(f: Iterable<ListenableFuture<T>>): ListenableFuture<List<T>> {
return Futures.allAsList(f) as ListenableFuture<List<T>>
}
//fun <T> sequence(vararg f: Iterable<ListenableFuture<T>>): ListenableFuture<List<T>> {
// return Futures.allAsList(f) as ListenableFuture<List<T>>
//}
fun <T, U> ListenableFuture<T>.zip(o: ListenableFuture<U>): ListenableFuture<Pair<T, U>> {
return this flatMap { i ->
o map { j -> i to j }
}
}
fun <T> ListenableFuture<T>.fallbackTo(f: ListenableFuture<T>): ListenableFuture<T> {
return this recoverWith { f }
}
fun <T> ListenableFuture<T>.recover(f: (t: Throwable?) -> T): ListenableFuture<T> {
return this recoverWith { promise(f(it)) }
}
fun <T, U> traverse(vararg l: T, f: (x: T) -> ListenableFuture<U>): ListenableFuture<Iterable<U>> {
return sequence(l.map(f)) as ListenableFuture<Iterable<U>>
}
fun <T, U> traverse(l: Iterable<T>, f: (x: T) -> ListenableFuture<U>): ListenableFuture<Iterable<U>> {
val futures = ArrayList<ListenableFuture<U>>()
return sequence(l.mapTo(futures, f)) as ListenableFuture<Iterable<U>>
}
fun <T> ListenableFuture<T>.recoverWith(f: (t: Throwable?) -> ListenableFuture<T>): ListenableFuture<T> {
return Futures.withFallback(this, f) as ListenableFuture<T>;
}
fun main(args: Array<String>): Unit {
val f = future {
1 as Int
} map {
it.plus(3) as Int
} map {
it.plus(50) as Int
} flatMap {
future {
it.plus(100)
}
}
f onSuccess {
when (it) {
is Int -> it
else -> Unit
}
}
println(f.get())
val f1 = future {
"Hello" + "World"
}
val f2 = promise<Int>()
f2.set(3)
val f3 = f1 flatMap { x ->
f2 map { y ->
x.length * y
}
}
val result = f3.get()
println(result)
val i = promise(1)
val j = promise(2)
val m = i zip j
println(m.get())
KFutures.executor.shutdownNow()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment