package com.softwaremill.mqperf | |
import com.softwaremill.mqperf.config.TestConfigOnS3 | |
import com.softwaremill.mqperf.mq.Mq | |
object Receiver extends App { | |
new TestConfigOnS3().whenChanged { testConfig => | |
println(s"Starting test (receiver) with config: $testConfig") | |
val mq = Mq.instantiate(testConfig) | |
val report = new ReportResults(testConfig.name) | |
val rr = new ReceiverRunnable( | |
mq, report, | |
testConfig.receiveMsgBatchSize | |
) | |
val threads = (1 to testConfig.receiverThreads).map { _ => | |
val t = new Thread(rr) | |
t.start() | |
t | |
} | |
threads.foreach(_.join()) | |
mq.close() | |
} | |
} | |
class ReceiverRunnable( | |
mq: Mq, | |
reportResults: ReportResults, | |
receiveMsgBatchSize: Int) extends Runnable { | |
private val mqReceiver = mq.createReceiver() | |
override def run() = { | |
val start = System.currentTimeMillis() | |
var lastReceived = System.currentTimeMillis() | |
var totalReceived = 0 | |
while ((System.currentTimeMillis() - lastReceived) < 60*1000L) { | |
val received = doReceive() | |
if (received > 0) { | |
lastReceived = System.currentTimeMillis() | |
} | |
totalReceived += received | |
} | |
reportResults.reportReceivingComplete(start, lastReceived, totalReceived) | |
mqReceiver.close() | |
} | |
private def doReceive() = { | |
val msgs = mqReceiver.receive(receiveMsgBatchSize) | |
val ids = msgs.map(_._1) | |
if (ids.size > 0) { | |
mqReceiver.ack(ids) | |
} | |
ids.size | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment