Skip to content

Instantly share code, notes, and snippets.

@joestein
Created January 22, 2012 04:32
Show Gist options
  • Save joestein/1655565 to your computer and use it in GitHub Desktop.
Save joestein/1655565 to your computer and use it in GitHub Desktop.
kafka-240patch in progress
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1234442)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy)
@@ -107,15 +107,18 @@
/**
* Send a message
*/
- def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
- verifyMessageSize(messages)
- val setSize = messages.sizeInBytes.asInstanceOf[Int]
- trace("Got message set with " + setSize + " bytes to send")
- send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages)))
+ def send(producerRequest: kafka.javaapi.ProducerRequest) {
+ producerRequest.foreach(d => {
+ d.foreach(p => {
+ verifyMessageSize(p.messages)
+ val setSize = p.messages.sizeInBytes.asInstanceOf[Int]
+ trace("Got message set with " + setSize + " bytes to send")
+ })
+ })
+ send(producerRequest)
}
-
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages)
+/*TODO: delete
def multiSend(produces: Array[ProducerRequest]) {
for (request <- produces)
verifyMessageSize(request.messages)
@@ -123,7 +126,7 @@
trace("Got multi message sets with " + setSize + " bytes to send")
send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
}
-
+*/
def close() = {
lock synchronized {
disconnect()
Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1234442)
+++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy)
@@ -64,25 +64,31 @@
None
}
- private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = {
- val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
- try {
- logManager.getOrCreateLog(request.topic, partition).append(request.messages)
- trace(request.messages.sizeInBytes + " bytes written to logs.")
- request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
- }
- catch {
- case e =>
- error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
- e match {
- case _: IOException =>
- fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
- Runtime.getRuntime.halt(1)
- case _ =>
+ private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): ProducerResponse = {
+ request.data.foreach(d => {
+ d.partition_data.foreach(p => {
+ val partition = p.getTranslatedPartition(d.topic, logManager.chooseRandomPartition)
+ try {
+ logManager.getOrCreateLog(d.topic, partition).append(p.messages)
+ trace(p.messages.sizeInBytes + " bytes written to logs.")
+ p.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
}
- throw e
- }
- None
+ catch {
+ case e =>
+ //TODO: handle response in ProducerResponse
+ error("Error processing " + requestHandlerName + " on " + d.topic + ":" + partition, e)
+ e match {
+ case _: IOException =>
+ fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
+ Runtime.getRuntime.halt(1)
+ case _ =>
+ }
+ //throw e
+ }
+ })
+ //None
+ })
+ Some(new ProducerResponse()) //TODO:
}
def handleFetchRequest(request: Receive): Option[Send] = {
Index: core/src/main/scala/kafka/api/ProducerRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1234442)
+++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy)
@@ -22,47 +22,115 @@
import kafka.network._
import kafka.utils._
+class WiredTopic(val topic: String, val partition_data: Array[WiredPartition])
+
+class WiredPartition(val partition: Int, val messages: ByteBufferMessageSet) {
+ def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = {
+ if (partition == ProducerRequest.RandomPartition)
+ return randomSelector(topic)
+ else
+ return partition
+ }
+}
+
object ProducerRequest {
val RandomPartition = -1
-
+
def readFrom(buffer: ByteBuffer): ProducerRequest = {
- val topic = Utils.readShortString(buffer, "UTF-8")
- val partition = buffer.getInt
- val messageSetSize = buffer.getInt
- val messageSetBuffer = buffer.slice()
- messageSetBuffer.limit(messageSetSize)
- buffer.position(buffer.position + messageSetSize)
- new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer))
+ val size: Int = buffer.getInt
+ val request_type_id: Short = buffer.getShort
+ val version_id: Short = buffer.getShort
+ val correlation_id: Int = buffer.getInt
+ val client_id: String = Utils.readShortString(buffer, "UTF-8")
+ val required_acks: Short = buffer.getShort
+ val ack_timeout: Int = buffer.getInt
+ //build the topic structure
+ val topicCount = buffer.getInt
+ val data = new Array[WiredTopic](topicCount)
+ for(i <- 0 until topicCount) {
+ val topic = Utils.readShortString(buffer, "UTF-8")
+
+ val partitionCount = buffer.getInt
+ //build the partition structure within this topic
+ val partition_data = new Array[WiredPartition](partitionCount)
+ for (j <- 0 until partitionCount) {
+ val partition = buffer.getInt
+ val messageSetSize = buffer.getInt
+ val messageSetBuffer = new Array[Byte](messageSetSize)
+ buffer.get(messageSetBuffer,0,buffer.position)
+ partition_data(j) = new WiredPartition(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
+ }
+ data(i) = new WiredTopic(topic,partition_data)
+ }
+ new ProducerRequest(size,request_type_id,version_id,correlation_id,client_id,required_acks,ack_timeout,data)
}
}
-class ProducerRequest(val topic: String,
- val partition: Int,
- val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+class ProducerRequest(val size: Int,
+ val request_type_id: Short,
+ val version_id: Short,
+ val correlation_id: Int,
+ val client_id: String,
+ val required_acks: Short,
+ val ack_timeout: Int,
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) {
def writeTo(buffer: ByteBuffer) {
- Utils.writeShortString(buffer, topic, "UTF-8")
- buffer.putInt(partition)
- buffer.putInt(messages.serialized.limit)
- buffer.put(messages.serialized)
- messages.serialized.rewind
+ buffer.putInt(size)
+ buffer.putShort(request_type_id)
+ buffer.putShort(version_id)
+ buffer.putInt(correlation_id)
+ Utils.writeShortString(buffer, client_id, "UTF-8")
+ buffer.putShort(required_acks)
+ buffer.putInt(ack_timeout)
+ //save the topic structure
+ buffer.putInt(data.size) //the number of topics
+ data.foreach(d =>{
+ Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic
+ buffer.putInt(d.partition_data.size) //the number of partitions
+ d.partition_data.foreach(p => {
+ buffer.putInt(p.partition)
+ buffer.putInt(p.messages.serialized.limit)
+ buffer.put(p.messages.serialized)
+ p.messages.serialized.rewind
+ })
+ })
}
-
- def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int]
- def getTranslatedPartition(randomSelector: String => Int): Int = {
- if (partition == ProducerRequest.RandomPartition)
- return randomSelector(topic)
- else
- return partition
+ def sizeInBytes(): Int = {
+ var size = 0
+ //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
+ size = 4 + 2 + 2 + 4 + 2 + client_id.length + 2 + 4 + 4;
+ data.foreach(d =>{
+ size += 2 + d.topic.length
+ d.partition_data.foreach(p => {
+ size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int]
+ })
+ })
+ size
}
+
override def toString: String = {
val builder = new StringBuilder()
builder.append("ProducerRequest(")
- builder.append(topic + ",")
- builder.append(partition + ",")
- builder.append(messages.sizeInBytes)
+ builder.append(size + ",")
+ builder.append(request_type_id + ",")
+ builder.append(version_id + ",")
+ builder.append(correlation_id + ",")
+ builder.append(client_id + ",")
+ builder.append(required_acks + ",")
+ builder.append(ack_timeout)
+ data.foreach(d =>{
+ builder.append(":[" + d.topic)
+ d.partition_data.foreach(p => {
+ builder.append(":[")
+ builder.append(p.partition + ",")
+ builder.append(p.messages.sizeInBytes)
+ builder.append("]")
+ })
+ builder.append("]")
+ })
builder.append(")")
builder.toString
}
@@ -70,14 +138,37 @@
override def equals(other: Any): Boolean = {
other match {
case that: ProducerRequest =>
- (that canEqual this) && topic == that.topic && partition == that.partition &&
- messages.equals(that.messages)
+ if (that canEqual this)
+ if (size == that.size && request_type_id == that.request_type_id &&
+ version_id == that.version_id && correlation_id == that.correlation_id &&
+ client_id == that.client_id && required_acks == that.required_acks && ack_timeout == that.ack_timeout) {
+ for(i <- 0 until data.size) {
+ if (data(i).topic != that.data(i).topic)
+ return false
+ for(j <- 0 until data(i).partition_data.size)
+ if (data(i).partition_data(j).partition != that.data(i).partition_data(j).partition || !data(i).partition_data(j).messages.equals(that.data(i).partition_data(j).messages))
+ return false
+ }
+ true
+ }
+ false
case _ => false
}
}
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
-
+ override def hashCode: Int = {
+ def hcp(num: Int): Int = {
+ 31 + (17 * num)
+ }
+ var hash = hcp(size) + hcp(request_type_id) + hcp(version_id) + hcp(correlation_id) + client_id.hashCode + hcp(required_acks) + hcp(ack_timeout)
+ data.foreach(d =>{
+ hash += d.topic.hashCode
+ d.partition_data.foreach(p => {
+ hash += hcp(p.partition) + p.messages.hashCode
+ })
+ })
+ hash
+ }
}
Index: core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (revision 1234442)
+++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (working copy)
@@ -25,23 +25,23 @@
val underlying = syncProducer
- def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
+ def send(size: Int, request_type_id: Short, version_id: Short, correlation_id: Int, client_id: String, required_acks: Short, ack_timeout: Int, topic: String, partition: Int, messages: ByteBufferMessageSet) {
import kafka.javaapi.Implicits._
- underlying.send(topic, partition, messages)
+ underlying.send(size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, topic, partition, messages)
}
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic,
- kafka.api.ProducerRequest.RandomPartition,
- messages)
+ def send(size: Int, request_type_id: Short, version_id: Short, correlation_id: Int, client_id: String, required_acks: Short, ack_timeout: Int, topic: String, messages: ByteBufferMessageSet): Unit = send(size, request_type_id, version_id,
+ correlation_id, client_id, required_acks, ack_timeout, topic, kafka.api.ProducerRequest.RandomPartition, messages)
+/*TODO: delete
def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) {
import kafka.javaapi.Implicits._
val produceRequests = new Array[kafka.api.ProducerRequest](produces.length)
for(i <- 0 until produces.length)
- produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages)
+ produceRequests(i) = produces(i).api()
underlying.multiSend(produceRequests)
}
-
+*/
def close() {
underlying.close
}
Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1234442)
+++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy)
@@ -17,36 +17,34 @@
package kafka.javaapi
import kafka.network.Request
-import kafka.api.RequestKeys
+import kafka.api.{RequestKeys, WiredTopic}
import java.nio.ByteBuffer
-class ProducerRequest(val topic: String,
- val partition: Int,
- val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+class ProducerRequest(val size: Int,
+ val request_type_id: Short,
+ val version_id: Short,
+ val correlation_id: Int,
+ val client_id: String,
+ val required_acks: Short,
+ val ack_timeout: Int,
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) {
+
import Implicits._
- private val underlying = new kafka.api.ProducerRequest(topic, partition, messages)
+ private val underlying = new kafka.api.ProducerRequest(size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data)
+ def api(): kafka.api.ProducerRequest = underlying
+
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
def sizeInBytes(): Int = underlying.sizeInBytes
- def getTranslatedPartition(randomSelector: String => Int): Int =
- underlying.getTranslatedPartition(randomSelector)
-
override def toString: String =
underlying.toString
- override def equals(other: Any): Boolean = {
- other match {
- case that: ProducerRequest =>
- (that canEqual this) && topic == that.topic && partition == that.partition &&
- messages.equals(that.messages)
- case _ => false
- }
- }
+ override def equals(other: Any): Boolean = underlying.equals(other)
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
+ override def hashCode: Int = underlying.hashCode
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment