Skip to content

Instantly share code, notes, and snippets.

@derveloper
Last active December 12, 2017 23:28
Show Gist options
  • Save derveloper/6f2831e4cab66fa0a46023ddf90f9d5f to your computer and use it in GitHub Desktop.
Save derveloper/6f2831e4cab66fa0a46023ddf90f9d5f to your computer and use it in GitHub Desktop.
package cc.vileda.devmatch
import com.github.davidmoten.rx2.flowable.Transformers
import io.vertx.core.DeploymentOptions
import io.vertx.core.json.JsonObject
import io.vertx.reactivex.core.AbstractVerticle
import io.vertx.reactivex.core.Vertx
import java.util.concurrent.atomic.AtomicInteger
class FooActor : AbstractVerticle() {
private val atomicInteger = AtomicInteger(0)
override fun start() {
val id = config().getInteger("id")
println("created $id deploymentId ${deploymentID()}")
vertx.eventBus().consumer<String>("foo.$id") {
atomicInteger.incrementAndGet()
//println("received ${it.body()} at $id at ${System.currentTimeMillis()} ${deploymentID()}")
}
}
override fun stop() {
println("${atomicInteger.get()} ${deploymentID()}")
}
}
fun main(args: Array<String>) {
val vertx = Vertx.vertx()
val atomicInteger = AtomicInteger(0)
val atomicCounter = AtomicInteger(0)
val trans = Transformers
.stateMachine()
.initialState(listOf<Int>())
.transition<JsonObject, JsonObject> { state, value, emitter ->
val id = value.getInteger("id")
if (state.contains(id)) {
emitter.onNext(value)
state
}
else {
vertx.deployVerticle(
FooActor::class.java.name,
DeploymentOptions().setConfig(JsonObject(mapOf("id" to id)))
) {
vertx.eventBus().publish("foo.$id", value.encode())
emitter.onNext(value)
}
state.plus(id)
}
}.build()
vertx.eventBus().consumer<String>("foo").toFlowable()
.map { JsonObject(it.body()) }
.compose(trans)
.doOnNext { msg ->
val id = msg.getInteger("id")
vertx.eventBus().publish("foo.$id", msg.encode())
}
.subscribe()
sendMsg(atomicInteger, atomicCounter, vertx)
sendMsg(atomicInteger, atomicCounter, vertx)
sendMsg(atomicInteger, atomicCounter, vertx)
sendMsg(atomicInteger, atomicCounter, vertx)
sendMsg(atomicInteger, atomicCounter, vertx)
sendMsg(atomicInteger, atomicCounter, vertx)
sendMsg(atomicInteger, atomicCounter, vertx)
sendMsg(atomicInteger, atomicCounter, vertx)
Thread.sleep(2000)
vertx.close()
}
private fun sendMsg(atomicInteger: AtomicInteger, atomicCounter: AtomicInteger, vertx: Vertx) {
val id = atomicInteger.getAndIncrement()
//println("sending $id ${System.currentTimeMillis()}")
vertx.eventBus().publish("foo", JsonObject(mapOf("id" to id, "p" to atomicCounter.getAndIncrement())).encode())
vertx.eventBus().publish("foo", JsonObject(mapOf("id" to id, "p" to atomicCounter.getAndIncrement())).encode())
vertx.eventBus().publish("foo", JsonObject(mapOf("id" to id, "p" to atomicCounter.getAndIncrement())).encode())
if (id == 3) {
atomicInteger.set(0)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment