Created
January 22, 2012 04:32
-
-
Save joestein/1655565 to your computer and use it in GitHub Desktop.
kafka-240patch in progress
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: core/src/main/scala/kafka/producer/SyncProducer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) | |
@@ -107,15 +107,18 @@ | |
/** | |
* Send a message | |
*/ | |
- def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { | |
- verifyMessageSize(messages) | |
- val setSize = messages.sizeInBytes.asInstanceOf[Int] | |
- trace("Got message set with " + setSize + " bytes to send") | |
- send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages))) | |
+ def send(producerRequest: kafka.javaapi.ProducerRequest) { | |
+ producerRequest.foreach(d => { | |
+ d.foreach(p => { | |
+ verifyMessageSize(p.messages) | |
+ val setSize = p.messages.sizeInBytes.asInstanceOf[Int] | |
+ trace("Got message set with " + setSize + " bytes to send") | |
+ }) | |
+ }) | |
+ send(producerRequest) | |
} | |
- | |
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages) | |
+/*TODO: delete | |
def multiSend(produces: Array[ProducerRequest]) { | |
for (request <- produces) | |
verifyMessageSize(request.messages) | |
@@ -123,7 +126,7 @@ | |
trace("Got multi message sets with " + setSize + " bytes to send") | |
send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) | |
} | |
- | |
+*/ | |
def close() = { | |
lock synchronized { | |
disconnect() | |
Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) | |
@@ -64,25 +64,31 @@ | |
None | |
} | |
- private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = { | |
- val partition = request.getTranslatedPartition(logManager.chooseRandomPartition) | |
- try { | |
- logManager.getOrCreateLog(request.topic, partition).append(request.messages) | |
- trace(request.messages.sizeInBytes + " bytes written to logs.") | |
- request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) | |
- } | |
- catch { | |
- case e => | |
- error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) | |
- e match { | |
- case _: IOException => | |
- fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) | |
- Runtime.getRuntime.halt(1) | |
- case _ => | |
+ private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): ProducerResponse = { | |
+ request.data.foreach(d => { | |
+ d.partition_data.foreach(p => { | |
+ val partition = p.getTranslatedPartition(d.topic, logManager.chooseRandomPartition) | |
+ try { | |
+ logManager.getOrCreateLog(d.topic, partition).append(p.messages) | |
+ trace(p.messages.sizeInBytes + " bytes written to logs.") | |
+ p.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) | |
} | |
- throw e | |
- } | |
- None | |
+ catch { | |
+ case e => | |
+ //TODO: handle response in ProducerResponse | |
+ error("Error processing " + requestHandlerName + " on " + d.topic + ":" + partition, e) | |
+ e match { | |
+ case _: IOException => | |
+ fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) | |
+ Runtime.getRuntime.halt(1) | |
+ case _ => | |
+ } | |
+ //throw e | |
+ } | |
+ }) | |
+ //None | |
+ }) | |
+ Some(new ProducerResponse()) //TODO: | |
} | |
def handleFetchRequest(request: Receive): Option[Send] = { | |
Index: core/src/main/scala/kafka/api/ProducerRequest.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) | |
@@ -22,47 +22,115 @@ | |
import kafka.network._ | |
import kafka.utils._ | |
+class WiredTopic(val topic: String, val partition_data: Array[WiredPartition]) | |
+ | |
+class WiredPartition(val partition: Int, val messages: ByteBufferMessageSet) { | |
+ def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = { | |
+ if (partition == ProducerRequest.RandomPartition) | |
+ return randomSelector(topic) | |
+ else | |
+ return partition | |
+ } | |
+} | |
+ | |
object ProducerRequest { | |
val RandomPartition = -1 | |
- | |
+ | |
def readFrom(buffer: ByteBuffer): ProducerRequest = { | |
- val topic = Utils.readShortString(buffer, "UTF-8") | |
- val partition = buffer.getInt | |
- val messageSetSize = buffer.getInt | |
- val messageSetBuffer = buffer.slice() | |
- messageSetBuffer.limit(messageSetSize) | |
- buffer.position(buffer.position + messageSetSize) | |
- new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer)) | |
+ val size: Int = buffer.getInt | |
+ val request_type_id: Short = buffer.getShort | |
+ val version_id: Short = buffer.getShort | |
+ val correlation_id: Int = buffer.getInt | |
+ val client_id: String = Utils.readShortString(buffer, "UTF-8") | |
+ val required_acks: Short = buffer.getShort | |
+ val ack_timeout: Int = buffer.getInt | |
+ //build the topic structure | |
+ val topicCount = buffer.getInt | |
+ val data = new Array[WiredTopic](topicCount) | |
+ for(i <- 0 until topicCount) { | |
+ val topic = Utils.readShortString(buffer, "UTF-8") | |
+ | |
+ val partitionCount = buffer.getInt | |
+ //build the partition structure within this topic | |
+ val partition_data = new Array[WiredPartition](partitionCount) | |
+ for (j <- 0 until partitionCount) { | |
+ val partition = buffer.getInt | |
+ val messageSetSize = buffer.getInt | |
+ val messageSetBuffer = new Array[Byte](messageSetSize) | |
+ buffer.get(messageSetBuffer,0,buffer.position) | |
+ partition_data(j) = new WiredPartition(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) | |
+ } | |
+ data(i) = new WiredTopic(topic,partition_data) | |
+ } | |
+ new ProducerRequest(size,request_type_id,version_id,correlation_id,client_id,required_acks,ack_timeout,data) | |
} | |
} | |
-class ProducerRequest(val topic: String, | |
- val partition: Int, | |
- val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) { | |
+class ProducerRequest(val size: Int, | |
+ val request_type_id: Short, | |
+ val version_id: Short, | |
+ val correlation_id: Int, | |
+ val client_id: String, | |
+ val required_acks: Short, | |
+ val ack_timeout: Int, | |
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { | |
def writeTo(buffer: ByteBuffer) { | |
- Utils.writeShortString(buffer, topic, "UTF-8") | |
- buffer.putInt(partition) | |
- buffer.putInt(messages.serialized.limit) | |
- buffer.put(messages.serialized) | |
- messages.serialized.rewind | |
+ buffer.putInt(size) | |
+ buffer.putShort(request_type_id) | |
+ buffer.putShort(version_id) | |
+ buffer.putInt(correlation_id) | |
+ Utils.writeShortString(buffer, client_id, "UTF-8") | |
+ buffer.putShort(required_acks) | |
+ buffer.putInt(ack_timeout) | |
+ //save the topic structure | |
+ buffer.putInt(data.size) //the number of topics | |
+ data.foreach(d =>{ | |
+ Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic | |
+ buffer.putInt(d.partition_data.size) //the number of partitions | |
+ d.partition_data.foreach(p => { | |
+ buffer.putInt(p.partition) | |
+ buffer.putInt(p.messages.serialized.limit) | |
+ buffer.put(p.messages.serialized) | |
+ p.messages.serialized.rewind | |
+ }) | |
+ }) | |
} | |
- | |
- def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int] | |
- def getTranslatedPartition(randomSelector: String => Int): Int = { | |
- if (partition == ProducerRequest.RandomPartition) | |
- return randomSelector(topic) | |
- else | |
- return partition | |
+ def sizeInBytes(): Int = { | |
+ var size = 0 | |
+ //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size | |
+ size = 4 + 2 + 2 + 4 + 2 + client_id.length + 2 + 4 + 4; | |
+ data.foreach(d =>{ | |
+ size += 2 + d.topic.length | |
+ d.partition_data.foreach(p => { | |
+ size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] | |
+ }) | |
+ }) | |
+ size | |
} | |
+ | |
override def toString: String = { | |
val builder = new StringBuilder() | |
builder.append("ProducerRequest(") | |
- builder.append(topic + ",") | |
- builder.append(partition + ",") | |
- builder.append(messages.sizeInBytes) | |
+ builder.append(size + ",") | |
+ builder.append(request_type_id + ",") | |
+ builder.append(version_id + ",") | |
+ builder.append(correlation_id + ",") | |
+ builder.append(client_id + ",") | |
+ builder.append(required_acks + ",") | |
+ builder.append(ack_timeout) | |
+ data.foreach(d =>{ | |
+ builder.append(":[" + d.topic) | |
+ d.partition_data.foreach(p => { | |
+ builder.append(":[") | |
+ builder.append(p.partition + ",") | |
+ builder.append(p.messages.sizeInBytes) | |
+ builder.append("]") | |
+ }) | |
+ builder.append("]") | |
+ }) | |
builder.append(")") | |
builder.toString | |
} | |
@@ -70,14 +138,37 @@ | |
override def equals(other: Any): Boolean = { | |
other match { | |
case that: ProducerRequest => | |
- (that canEqual this) && topic == that.topic && partition == that.partition && | |
- messages.equals(that.messages) | |
+ if (that canEqual this) | |
+ if (size == that.size && request_type_id == that.request_type_id && | |
+ version_id == that.version_id && correlation_id == that.correlation_id && | |
+ client_id == that.client_id && required_acks == that.required_acks && ack_timeout == that.ack_timeout) { | |
+ for(i <- 0 until data.size) { | |
+ if (data(i).topic != that.data(i).topic) | |
+ return false | |
+ for(j <- 0 until data(i).partition_data.size) | |
+ if (data(i).partition_data(j).partition != that.data(i).partition_data(j).partition || !data(i).partition_data(j).messages.equals(that.data(i).partition_data(j).messages)) | |
+ return false | |
+ } | |
+ true | |
+ } | |
+ false | |
case _ => false | |
} | |
} | |
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] | |
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode | |
- | |
+ override def hashCode: Int = { | |
+ def hcp(num: Int): Int = { | |
+ 31 + (17 * num) | |
+ } | |
+ var hash = hcp(size) + hcp(request_type_id) + hcp(version_id) + hcp(correlation_id) + client_id.hashCode + hcp(required_acks) + hcp(ack_timeout) | |
+ data.foreach(d =>{ | |
+ hash += d.topic.hashCode | |
+ d.partition_data.foreach(p => { | |
+ hash += hcp(p.partition) + p.messages.hashCode | |
+ }) | |
+ }) | |
+ hash | |
+ } | |
} | |
Index: core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (working copy) | |
@@ -25,23 +25,23 @@ | |
val underlying = syncProducer | |
- def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { | |
+ def send(size: Int, request_type_id: Short, version_id: Short, correlation_id: Int, client_id: String, required_acks: Short, ack_timeout: Int, topic: String, partition: Int, messages: ByteBufferMessageSet) { | |
import kafka.javaapi.Implicits._ | |
- underlying.send(topic, partition, messages) | |
+ underlying.send(size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, topic, partition, messages) | |
} | |
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, | |
- kafka.api.ProducerRequest.RandomPartition, | |
- messages) | |
+ def send(size: Int, request_type_id: Short, version_id: Short, correlation_id: Int, client_id: String, required_acks: Short, ack_timeout: Int, topic: String, messages: ByteBufferMessageSet): Unit = send(size, request_type_id, version_id, | |
+ correlation_id, client_id, required_acks, ack_timeout, topic, kafka.api.ProducerRequest.RandomPartition, messages) | |
+/*TODO: delete | |
def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { | |
import kafka.javaapi.Implicits._ | |
val produceRequests = new Array[kafka.api.ProducerRequest](produces.length) | |
for(i <- 0 until produces.length) | |
- produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages) | |
+ produceRequests(i) = produces(i).api() | |
underlying.multiSend(produceRequests) | |
} | |
- | |
+*/ | |
def close() { | |
underlying.close | |
} | |
Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy) | |
@@ -17,36 +17,34 @@ | |
package kafka.javaapi | |
import kafka.network.Request | |
-import kafka.api.RequestKeys | |
+import kafka.api.{RequestKeys, WiredTopic} | |
import java.nio.ByteBuffer | |
-class ProducerRequest(val topic: String, | |
- val partition: Int, | |
- val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) { | |
+class ProducerRequest(val size: Int, | |
+ val request_type_id: Short, | |
+ val version_id: Short, | |
+ val correlation_id: Int, | |
+ val client_id: String, | |
+ val required_acks: Short, | |
+ val ack_timeout: Int, | |
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { | |
+ | |
import Implicits._ | |
- private val underlying = new kafka.api.ProducerRequest(topic, partition, messages) | |
+ private val underlying = new kafka.api.ProducerRequest(size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data) | |
+ def api(): kafka.api.ProducerRequest = underlying | |
+ | |
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } | |
def sizeInBytes(): Int = underlying.sizeInBytes | |
- def getTranslatedPartition(randomSelector: String => Int): Int = | |
- underlying.getTranslatedPartition(randomSelector) | |
- | |
override def toString: String = | |
underlying.toString | |
- override def equals(other: Any): Boolean = { | |
- other match { | |
- case that: ProducerRequest => | |
- (that canEqual this) && topic == that.topic && partition == that.partition && | |
- messages.equals(that.messages) | |
- case _ => false | |
- } | |
- } | |
+ override def equals(other: Any): Boolean = underlying.equals(other) | |
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] | |
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode | |
+ override def hashCode: Int = underlying.hashCode | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment