Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save maciekciolek/c645e28cc708b26177eb1444d077d09a to your computer and use it in GitHub Desktop.
Save maciekciolek/c645e28cc708b26177eb1444d077d09a to your computer and use it in GitHub Desktop.
Example of KafkaBatchProducer
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.{MetricConfig, Metrics, MetricsReporter}
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
import org.apache.kafka.common.requests.{ProduceRequest, RequestSend}
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.{Cluster, TopicPartition}
import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
/**
* @author mciolek
*/
case class SampleRecord(key: String, value: String)
class KafkaBatchProducer(clientId: String, addresses: List[String]) {
private val props = Map[String, AnyRef]("security.protocol" -> "PLAINTEXT").asJava
private val time = new SystemTime
private val channelBuilder = ClientUtils.createChannelBuilder(props)
private val metricTags = Map("client-id" -> clientId).asJava
private val metricConfig = new MetricConfig()
.samples(10)
.timeWindow(1000, TimeUnit.MILLISECONDS)
.tags(metricTags)
private val metricReports = List.empty[MetricsReporter].asJava
private val metrics = new Metrics(metricConfig, metricReports, time)
private val selector = new Selector(30000, metrics, time, "producer", channelBuilder)
private val metadata = new Metadata(1000, 10000)
metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(addresses.asJava)), time.milliseconds)
private val client: KafkaClient = new NetworkClient(
selector,
metadata,
clientId,
5,
1000,
102400,
32768,
30000,
time
)
def sendBatch(topic: String, partition: Int, batch: List[SampleRecord]): Future[ClientResponse] = {
//The node should be the leader of topic and partition
val node = metadata.fetch().nodes().get(0)
if (client.isReady(node, time.milliseconds())) {
val topicPartition = new TopicPartition(topic, partition)
val buffer = ByteBuffer.allocate(1024)
val records = MemoryRecords.emptyRecords(buffer, CompressionType.NONE)
val promise = Promise[ClientResponse]
batch
.zipWithIndex
.foreach { case (SampleRecord(key, value), offset) =>
records.append(offset, time.milliseconds(), key.getBytes("UTF-8"), value.getBytes("UTF-8"))
}
records.close()
val partitionRecords = Map(topicPartition -> records.buffer())
val produceRequest = new ProduceRequest(1, 1000, partitionRecords.asJava)
val sendRequest = new RequestSend(String.valueOf(node.id()), client.nextRequestHeader(ApiKeys.PRODUCE), produceRequest.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, sendRequest, new RequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit =
//The response should be analyzed here in order to fulfill promise with success or error
promise.success(response)
})
client.send(clientRequest, time.milliseconds())
promise.future
} else {
Future.failed(new IllegalStateException("Leader is not ready."))
}
}
def poolNewData() = client.poll(1000, time.milliseconds())
//Ugly part...
//Ensure, that Kafka will start connections...
metadata.fetch().nodes().asScala.foreach(node => client.ready(node, time.milliseconds()))
//Wait until the client is ready to hand
while (!client.isReady(metadata.fetch().nodes().get(0), time.milliseconds())) {
client.poll(1000, time.milliseconds())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment