Skip to content

Instantly share code, notes, and snippets.

View vhutov's full-sized avatar

Vladyslav Hutov vhutov

View GitHub Profile
trait Message {
def id: String
}
...//set up protocols
val action = send(msg) //here goes our dsl
.via(/*put here producer*/)
.receive(/*put here consumer*/)
val scn = scenario("custom plugin") //here goes gatling
.feeder(feed)
.exec(action)
trait Consumer {
def name: String
def await(msg: Message): Future[Unit]
def close(): Unit
}
trait RequestActionBuilder[V <: Message] {
type PK <: ProtocolKey
def key: PK
def build(ctx: ScenarioContext, components: PK#Components): Producer[V]
}
class KafkaRequestActionBuilder[K, V <: Message](
topic: String, //this parameters will be provided when building KafkaPublisher
keyExtractor: V => Option[K]) extends RequestActionBuilder[V] {
override type PK = KafkaProtocol.KafkaProtocolKey.type
override def key = KafkaProtocol.KafkaProtocolKey
override def build(ctx: ScenarioContext, components: KafkaComponents): Producer[V] = {
val producer = new KafkaProducer[K, V](components.protocol.properties.asJava)
new KafkaPublisher[K, V](producer, topic)(keyExtractor)
class GenericActionBuilder[V <: Message](
attributes: Attributes[V],
requestActionBuilder: RequestActionBuilder[V],
responseActionBuilder: ResponseActionBuilder
) extends ActionBuilder with LazyLogging {
override def build(ctx: ScenarioContext, next: Action): Action = {
import ctx._
val requestProtocolComponents = protocolComponentsRegistry.components(requestActionBuilder.key)
class GenericAction[V <: Message](
val coreComponents: CoreComponents,
val attributes: Attributes[V],
val publisher: Publisher[V],
val consumer: Consumer,
val throttled: Boolean,
val next: Action
) extends ExitableAction with NameGen {
val statsEngine = coreComponents.statsEngine
class JmsManagerActor(messageParser: MessageParser) extends Actor {
def receive: Receive = {
case SubscribeForMessage(id) if isAlreadyReceived(id) =>
msgsBuffer.remove(id)
sender ! Ack
case SubscribeForMessage(id) =>
subscriptionMap.update(id, sender())
object KafkaProtocolBuilder {
implicit def toProtocol(builder: KafkaProtocolBuilder): KafkaProtocol = builder.build
def apply(configuration: GatlingConfiguration): KafkaProtocolBuilder =
KafkaProtocolBuilder(KafkaProtocol(configuration))
}
case class KafkaProtocolBuilder(kafkaProtocol: KafkaProtocol) {
def build = kafkaProtocol
import scala.concurrent.duration._
import io.gatling.core.Predef._
import Predef._
class Test extends Simulation {
val kafkaP = kafkaProtocol.properties(Map("bootstrap.servers" -> "localhost:9092"))
val jmsP = jmsProtocol.url("localhost:7222")