AMQP Gatling Test (Gatling 3.1.2, Scala 2.12.8, com.rabbitmq:amqp-client:5.3.0)
package simulations | |
import io.gatling.commons.validation.Validation | |
import io.gatling.core.Predef._ | |
import io.gatling.core.action.{RequestAction} | |
import io.gatling.core.action.builder.ActionBuilder | |
import io.gatling.core.session.{Expression, Session} | |
import io.gatling.core.structure.ScenarioContext | |
import io.gatling.core.action.Action | |
import scala.concurrent.duration._ | |
import scala.language.postfixOps | |
import com.rabbitmq.client.ConnectionFactory | |
import com.typesafe.scalalogging.StrictLogging | |
import io.gatling.commons.stats.OK | |
import io.gatling.commons.util.Clock | |
import io.gatling.core.stats.StatsEngine | |
import io.gatling.core.util.NameGen | |
import io.gatling.commons.validation._ | |
class RabbitTest extends Simulation { | |
def amqpSend(message: String) = new ActionBuilder { | |
def build(ctx: ScenarioContext, next: Action): Action = { | |
new AmqpSend(message, ctx, next) | |
} | |
} | |
val scn = scenario("AMQP protocol test") | |
.exec(amqpSend("Hello, World")) | |
setUp( | |
scn.inject( | |
rampUsersPerSec(1) to (60) during(180 seconds), | |
) | |
) | |
} | |
class Around(before: () => Unit, after: () => Unit) { | |
def apply(f: => Any): Unit = { | |
before() | |
f | |
after() | |
} | |
} | |
trait AmqpLogging extends StrictLogging { | |
def logMessage(text: => String, msg: String): Unit = { | |
logger.debug(text) | |
logger.trace(msg.toString) | |
} | |
} | |
class AmqpSend(val message: String, val ctx: ScenarioContext, val next: Action) extends RequestAction with AmqpLogging with NameGen { | |
override val name: String = genName("amqp publish") | |
override val requestName: Expression[String] = "amqp publish" | |
override def clock: Clock = ctx.coreComponents.clock | |
override def statsEngine: StatsEngine = ctx.coreComponents.statsEngine | |
def send(): Unit = { | |
val factory = new ConnectionFactory() | |
factory.setUsername("guest") | |
factory.setPassword("guest") | |
factory.setHost("localhost") | |
factory.setVirtualHost("/") | |
factory.setPort(5672) | |
val conn = factory.newConnection() | |
val channel = conn.createChannel() | |
channel.exchangeDeclare("amqp.direct.1", "direct", true) | |
val messageBodyBytes = message.getBytes() | |
channel.basicPublish("amqp.direct.1", "routing.key.1", null, messageBodyBytes) | |
channel.close() | |
conn.close() | |
} | |
override def sendRequest(requestName: String, session: Session): Validation[Unit] = | |
for { | |
around <- aroundSend(requestName, session) | |
} yield { | |
around( | |
send() | |
) | |
} | |
protected def aroundSend(requestName: String, session: Session): Validation[Around] = { | |
new Around( | |
before = () => { | |
if (logger.underlying.isDebugEnabled) { | |
logMessage(s"Message sent", message) | |
} | |
}, | |
after = () => { | |
ctx.coreComponents.configuration.resolve( | |
{ | |
val now = clock.nowMillis | |
statsEngine.logResponse(session, requestName, session.startDate, now, OK, None, None) | |
} | |
) | |
next ! session | |
} | |
).success | |
} | |
} |
This comment has been minimized.
This comment has been minimized.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Report Example