Skip to content

Instantly share code, notes, and snippets.

@vhutov
Created March 12, 2018 22:44
Show Gist options
  • Save vhutov/a1e91c6e7609bf01508e1dcf1da34ada to your computer and use it in GitHub Desktop.
Save vhutov/a1e91c6e7609bf01508e1dcf1da34ada to your computer and use it in GitHub Desktop.
class KafkaRequestActionBuilder[K, V <: Message](
topic: String, //this parameters will be provided when building KafkaPublisher
keyExtractor: V => Option[K]) extends RequestActionBuilder[V] {
override type PK = KafkaProtocol.KafkaProtocolKey.type
override def key = KafkaProtocol.KafkaProtocolKey
override def build(ctx: ScenarioContext, components: KafkaComponents): Producer[V] = {
val producer = new KafkaProducer[K, V](components.protocol.properties.asJava)
new KafkaPublisher[K, V](producer, topic)(keyExtractor)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment