Skip to content

Instantly share code, notes, and snippets.

@mgodave
Last active October 15, 2018 09:29
Show Gist options
  • Save mgodave/7469931 to your computer and use it in GitHub Desktop.
Save mgodave/7469931 to your computer and use it in GitHub Desktop.
Kotlin actor
package actors
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.MoreExecutors
import java.util.concurrent.Executors
import java.util.ArrayList
import java.util.concurrent.LinkedBlockingQueue
import com.google.common.util.concurrent.ListeningExecutorService
import java.util.concurrent.ThreadFactory
import com.google.common.util.concurrent.ThreadFactoryBuilder
import futures.*
import java.util.concurrent.atomic.AtomicBoolean
object Actors {
private val threadFactory: ThreadFactory? = ThreadFactoryBuilder().setDaemon(true)?.build()
private val pool = Executors.newCachedThreadPool(threadFactory as ThreadFactory)
public val executor: ListeningExecutorService = MoreExecutors.listeningDecorator(pool) as ListeningExecutorService;
}
inline fun <V> ListeningExecutorService.submit(f: () -> V): ListenableFuture<V> = submit(callable(f))
abstract class Actor() {
private data class Message(val m: Any?, val p: SettableFuture<Any>)
private val running = AtomicBoolean(false)
private val mailbox = LinkedBlockingQueue<Message>()
abstract fun receive(m: Any?): Any?
fun send(m: Any?): ListenableFuture<Any> {
val result = promise<Any>()
mailbox.put(Message(m, result))
dispatch()
return result
}
private fun dispatch() {
if (!running.get() && !mailbox.isEmpty()) {
if (running.compareAndSet(false, true)) {
val messages = ArrayList<Message>()
val num = mailbox.drainTo(messages, 10)
println(num)
Actors.executor submit {
for (msg in messages) {
try {
msg.p.set(receive(msg.m))
} catch (t: Throwable) {
msg.p.setException(t)
}
}
running.set(false)
dispatch()
}
}
}
}
}
fun actor(f: (Any?) -> Any?): Actor {
return object: Actor() {
override fun receive(m: Any?): Any? {
return f(m)
}
}
}
data class Stuff(val i: Int)
fun ringActor(next: Actor? = null): Actor {
return actor { m ->
when (next) {
null -> println(m)
else -> next send m
}
}
}
fun runRing() {
val last = ringActor()
var first: Actor = last
for (i in 1..1000) {
first = ringActor(first)
}
for (i in 1..100) {
first send i
}
}
data class PingMessage(val a: Actor)
data class PongMessage(val a: Actor)
object Ping: Actor() {
override fun receive(m: Any?): Unit {
when (m) {
is PongMessage -> {
println("Pong")
m.a send PingMessage(this)
}
else -> throw IllegalStateException()
}
}
}
object Pong: Actor() {
override fun receive(m: Any?): Unit {
when (m) {
is PingMessage -> {
println("Ping")
m.a send PongMessage(this)
}
else -> throw IllegalStateException()
}
}
}
fun runPingPong() {
Pong send PingMessage(Ping)
Thread.sleep(10000)
}
fun main(args: Array<String>): Unit {
val a = actor {
when (it) {
is Stuff -> "Stuff ${it.i}"
else -> "Nothing"
}
}
val f = traverse(1..100) {(i: Int) -> a send Stuff(i) }
f andThen {
val results = it as Iterable<Any?>
results.forEach { println(it) }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment