Skip to content

Instantly share code, notes, and snippets.

@jorgeuriarte
Last active August 29, 2015 14:22
Show Gist options
  • Save jorgeuriarte/f232f07c0f2a938efa14 to your computer and use it in GitHub Desktop.
Save jorgeuriarte/f232f07c0f2a938efa14 to your computer and use it in GitHub Desktop.
Trying to track down weird behaviour with Vertx. It essentially loads a redis queue and then translates all its elements into messages in the eventbus. The problem is once some time and load has been "suffered", the bus starts reporting "failed" sends, even though they have already been checked as "received" (check the UUIDs logs, once the failu…
package nire.verticles;
@groovy.transform.Field HANDLER_NAME = "nire.handlers.AsignarEjercicio"
def eb = vertx.eventBus()
println "Will listen at ${HANDLER_NAME}"
eb.consumer(HANDLER_NAME) { message ->
def worker = new DummyWorker()
worker.processElement(message.body().msg)
message.reply("Done")
}
/* Empty Worker */
class DummyWorker {
def processElement(text) {
}
}
#
# Load redis with some thousands of elements in queue_exercises list
# Do this *before* launching vertx to make sure
#
for i in `seq 0 9`; do parallel ./loadredis.sh ::: `seq -w 02${i}000 02${i}999`; done
#!/bin/sh
#
#
redis-cli lpush queue_exercises demouser##00${1};
package nire.verticles;
import redis.clients.jedis.*
import groovy.transform.Field
import static groovy.json.JsonOutput.toJson
import static java.util.UUID.randomUUID
import nire.workers.*
import nire.verticles.*
JedisPoolConfig poolconfig = new JedisPoolConfig()
poolconfig.testOnBorrow = true
poolconfig.testOnReturn = true
poolconfig.maxTotal = 500
poolconfig.blockWhenExhausted = false
JedisPool pool = new JedisPool(poolconfig, "localhost", Protocol.DEFAULT_PORT, 2000, null, 0)
println "Listening at following redis queues:"
queueNames.each { q ->
println "--> ${q}"
}
vertx.setPeriodic(1000) { id ->
Jedis instance = pool.resource
println "Redis listener ${id.toString().padLeft(id*11)} launched!"
try {
listen(instance)
} catch (e) {
log.error "Error en bucle principal: ${e}"
} finally {
println "listener done: ${id.toString().padLeft(id*11)}"
instance.close()
}
}
@Field def QUEUES_HANDLERS = [(AsignarEjercicioWorker.QUEUE_NAME): 'nire.handlers.AsignarEjercicio']
def getQueueNames() {
QUEUES_HANDLERS.keySet() as List
}
def handlerForQueue(name) {
QUEUES_HANDLERS[name]
}
def listen(instance) {
def QUEUE_TIMEOUT = 1
def element
def BACKFIRE_DELAY_MS = 1000
def SEND_TIMEOUT = 300*1000
while(element = instance.brpop(QUEUE_TIMEOUT, *this.queueNames)) {
if (element) {
def queue = element[0]
def handler = handlerForQueue(queue)
def msg = [timestamp:randomUUID() as String,
msg:element[1]]
try {
def options = [sendTimeout:SEND_TIMEOUT]
vertx.eventBus().send(handler, msg, options) { reply ->
if (reply.succeeded()) {
println "Received reply from '${handler}': ${msg}"
}
if (reply.failed()) {
println "No reply from '${handler}' to message '${msg.timestamp}'. Backfiring..."
}
}
}
catch(Exception e) {
println "El worker ${this.getClass().name} lanzó una EXCEPCION al procesar ${element}\n${e}"
throw e
}
}
}
}
#!/bin/sh
#
export VERTX_OPTS="-Dlog4j.ignoreTCL=true"
LIBS=../../../vertxlib/annotations-2.0.3.jar:../../../vertxlib/apns-1.0.0.Beta5.jar:../../../vertxlib/commons-beanutils-1.8.0.jar:../../../vertxlib/commons-codec-1.6.jar:../../../vertxlib/commons-collections-3.2.1.jar:../../../vertxlib/commons-httpclient-3.1.jar:../../../vertxlib/commons-lang-2.4.jar:../../../vertxlib/commons-logging-1.1.1.jar:../../../vertxlib/commons-pool2-2.0.jar:../../../vertxlib/ezmorph-1.0.6.jar:../../../vertxlib/gpars-1.2.1.jar:../../../vertxlib/groovy-all-2.3.10.jar:../../../vertxlib/groovy-xmlrpc-0.7.jar:../../../vertxlib/hamcrest-core-1.3.jar:../../../vertxlib/http-builder-0.7.jar:../../../vertxlib/httpclient-4.2.1.jar:../../../vertxlib/httpcore-4.2.1.jar:../../../vertxlib/jackson-annotations-2.4.0.jar:../../../vertxlib/jackson-core-2.4.3.jar:../../../vertxlib/jackson-databind-2.4.3.jar:../../../vertxlib/jedis-2.6.2.jar:../../../vertxlib/joda-time-2.3.jar:../../../vertxlib/json-../../../lib-2.3-jdk15.jar:../../../vertxlib/jsr166y-1.7.0.jar:../../../vertxlib/junit-4.13-SNAPSHOT.jar:../../../vertxlib/log4j-1.2.17.jar:../../../vertxlib/multiverse-core-0.7.0.jar:../../../vertxlib/mysql-connector-java-5.1.6.jar:../../../vertxlib/nekohtml-1.9.16.jar:../../../vertxlib/netty-all-4.0.25.Final.jar:../../../vertxlib/scribe-1.3.5.jar:../../../vertxlib/signpost-commonshttp4-1.2.1.2.jar:../../../vertxlib/signpost-core-1.2.1.2.jar:../../../vertxlib/slf4j-api-1.7.2.jar:../../../vertxlib/smack-3.1.0.jar:../../../vertxlib/vertx-core-3.0.0-SNAPSHOT.jar:../../../vertxlib/vertx-lang-groovy-3.0.0-SNAPSHOT.jar:../../../vertxlib/xercesImpl-2.9.1.jar:../../../vertxlib/xml-apis-1.3.04.jar:../../../vertxlib/xml-resolver-1.2.jar
pushd src/main/groovy
/Users/jorge/Downloads/vert.x-3.0.0-SNAPSHOT/bin/vertx run nire/server/VertxReplyTest.groovy -cp $LIBS
popd
package nire.server;
import io.vertx.groovy.core.Vertx
def vertx = Vertx.vertx()
vertx.executeBlocking({ future ->
vertx.deployVerticle('groovy:nire/verticles/DummyVerticle.groovy', [worker:true, instances:3])
future.complete("deployed")
}, { res ->
println "Func verticles deployed!"
})
def techVerticles = ""
def saveID = { id ->
techVerticles += " ${id.result().toString()}"
}
vertx.setTimer(2000) {
vertx.executeBlocking({ future ->
println "Deploying tech verticles..."
vertx.deployVerticle('groovy:nire/verticles/RedisVertxTestBridge.groovy', [worker:true, instances:2], saveID)
}, { res ->
println "Tech verticles deployed!"
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment