Created
February 4, 2012 04:59
-
-
Save joestein/1735455 to your computer and use it in GitHub Desktop.
kafka-240 ProducerRequest
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: core/src/main/scala/kafka/producer/ProducerPool.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/ProducerPool.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/ProducerPool.scala (working copy) | |
@@ -26,6 +26,8 @@ | |
import kafka.common.{UnavailableProducerException, InvalidConfigException} | |
import kafka.utils.{Utils, Logging} | |
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} | |
+import collection.mutable.{HashMap, ListBuffer} | |
+import kafka.api.{WiredTopic, WiredPartition} | |
class ProducerPool[V](private val config: ProducerConfig, | |
private val serializer: Encoder[V], | |
@@ -94,7 +96,7 @@ | |
* producer to publish the data to the specified broker partition | |
* @param poolData the producer pool request object | |
*/ | |
- def send(poolData: ProducerPoolData[V]*) { | |
+ def send(correlation_id: Int, poolData: ProducerPoolData[V]*) { | |
val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct | |
var remainingRequests = poolData.toSeq | |
distinctBrokers.foreach { bid => | |
@@ -102,18 +104,20 @@ | |
remainingRequests = requestsForThisBid._2 | |
if(sync) { | |
- val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId, | |
- new ByteBufferMessageSet(compressionCodec = config.compressionCodec, | |
- messages = req.getData.map(d => serializer.toMessage(d)): _*))) | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file | |
+ val topics = new HashMap[String, ListBuffer[WiredPartition]]() | |
+ requestsForThisBid._1.map(req => { | |
+ topics(req.getTopic).append(new WiredPartition(req.getBidPid.partId, new ByteBufferMessageSet(compressionCodec = config.compressionCodec, | |
+ messages = req.getData.map(d => serializer.toMessage(d)): _*))) | |
+ }) | |
+ val wired_topics = topics.map(kv => new WiredTopic(kv._1,kv._2.toArray)) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, wired_topics.toArray) | |
debug("Fetching sync producer for broker id: " + bid) | |
val producer = syncProducers.get(bid) | |
- if(producer != null) { | |
- if(producerRequests.size > 1) | |
- producer.multiSend(producerRequests.toArray) | |
- else | |
- producer.send(topic = producerRequests(0).topic, | |
- partition = producerRequests(0).partition, | |
- messages = producerRequests(0).messages) | |
+ if(producer != null) { | |
+ producer.send(producerRequest) | |
config.compressionCodec match { | |
case NoCompressionCodec => debug("Sending message to broker " + bid) | |
case _ => debug("Sending compressed messages to broker " + bid) | |
Index: core/src/main/scala/kafka/producer/SyncProducer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) | |
@@ -50,7 +50,8 @@ | |
if (logger.isTraceEnabled) { | |
trace("verifying sendbuffer of size " + buffer.limit) | |
val requestTypeId = buffer.getShort() | |
- if (requestTypeId == RequestKeys.MultiProduce) { | |
+ //TODO: KAFKA-240 need to understand purpose of this | |
+ /* if (requestTypeId == RequestKeys.MultiProduce) { | |
try { | |
val request = MultiProducerRequest.readFrom(buffer) | |
for (produce <- request.produces) { | |
@@ -69,7 +70,7 @@ | |
case e: Throwable => | |
trace("error verifying sendbuffer ", e) | |
} | |
- } | |
+ }*/ | |
} | |
} | |
@@ -107,15 +108,18 @@ | |
/** | |
* Send a message | |
*/ | |
- def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { | |
- verifyMessageSize(messages) | |
- val setSize = messages.sizeInBytes.asInstanceOf[Int] | |
- trace("Got message set with " + setSize + " bytes to send") | |
- send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages))) | |
+ def send(producerRequest: kafka.javaapi.ProducerRequest) { | |
+ producerRequest.data.foreach(d => { | |
+ d.partition_data.foreach(p => { | |
+ verifyMessageSize(p.messages) | |
+ val setSize = p.messages.sizeInBytes.asInstanceOf[Int] | |
+ trace("Got message set with " + setSize + " bytes to send") | |
+ }) | |
+ }) | |
+ send(producerRequest) | |
} | |
- | |
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages) | |
+/*TODO: delete | |
def multiSend(produces: Array[ProducerRequest]) { | |
for (request <- produces) | |
verifyMessageSize(request.messages) | |
@@ -123,7 +127,7 @@ | |
trace("Got multi message sets with " + setSize + " bytes to send") | |
send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) | |
} | |
- | |
+*/ | |
def close() = { | |
lock synchronized { | |
disconnect() | |
Index: core/src/main/scala/kafka/producer/Producer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/Producer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/Producer.scala (working copy) | |
@@ -93,6 +93,8 @@ | |
this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner, | |
new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null) | |
+ var correlation_id: Int = 0 //allows the client to send request specific information through the producer | |
+ | |
/** | |
* Sends the data, partitioned by key to the topic using either the | |
* synchronous or the asynchronous producer | |
@@ -138,7 +140,7 @@ | |
new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId), | |
pd.getData) | |
} | |
- producerPool.send(producerPoolRequests: _*) | |
+ producerPool.send(correlation_id, producerPoolRequests: _*) | |
} | |
private def configSend(producerData: ProducerData[K,V]*) { | |
@@ -160,7 +162,7 @@ | |
new Partition(brokerIdPartition.brokerId, partition), | |
pd.getData) | |
} | |
- producerPool.send(producerPoolRequests: _*) | |
+ producerPool.send(correlation_id, producerPoolRequests: _*) | |
} | |
private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = { | |
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) | |
@@ -46,8 +46,9 @@ | |
private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) { | |
if(messagesPerTopic.size > 0) { | |
- val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray | |
- syncProducer.multiSend(requests) | |
+ //TODO KAFKA-240 | |
+ //val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray | |
+ //syncProducer.multiSend(requests) | |
trace("kafka producer sent messages for topics %s to broker %s:%d" | |
.format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port)) | |
} | |
Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) | |
@@ -32,8 +32,6 @@ | |
*/ | |
private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Logging { | |
- private val requestLogger = Logger.getLogger("kafka.request.logger") | |
- | |
def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = { | |
requestTypeId match { | |
case RequestKeys.Produce => handleProducerRequest _ | |
@@ -48,12 +46,9 @@ | |
def handleProducerRequest(receive: Receive): Option[Send] = { | |
val sTime = SystemTime.milliseconds | |
val request = ProducerRequest.readFrom(receive.buffer) | |
- | |
- if(requestLogger.isTraceEnabled) | |
- requestLogger.trace("Producer request " + request.toString) | |
- handleProducerRequest(request, "ProduceRequest") | |
+ trace("Producer request " + request.toString) | |
debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms") | |
- None | |
+ Some(handleProducerRequest(request, "ProduceRequest")) | |
} | |
def handleMultiProducerRequest(receive: Receive): Option[Send] = { | |
@@ -64,25 +59,31 @@ | |
None | |
} | |
- private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = { | |
- val partition = request.getTranslatedPartition(logManager.chooseRandomPartition) | |
- try { | |
- logManager.getOrCreateLog(request.topic, partition).append(request.messages) | |
- trace(request.messages.sizeInBytes + " bytes written to logs.") | |
- request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) | |
- } | |
- catch { | |
- case e => | |
- error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) | |
- e match { | |
- case _: IOException => | |
- fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) | |
- Runtime.getRuntime.halt(1) | |
- case _ => | |
+ private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): Option[ProducerResponse] = { | |
+ request.data.foreach(d => { | |
+ d.partition_data.foreach(p => { | |
+ val partition = p.getTranslatedPartition(d.topic, logManager.chooseRandomPartition) | |
+ try { | |
+ logManager.getOrCreateLog(d.topic, partition).append(p.messages) | |
+ trace(p.messages.sizeInBytes + " bytes written to logs.") | |
+ p.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) | |
} | |
- throw e | |
- } | |
- None | |
+ catch { | |
+ case e => | |
+ //TODO: handle response in ProducerResponse | |
+ error("Error processing " + requestHandlerName + " on " + d.topic + ":" + partition, e) | |
+ e match { | |
+ case _: IOException => | |
+ fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) | |
+ Runtime.getRuntime.halt(1) | |
+ case _ => | |
+ } | |
+ //throw e | |
+ } | |
+ }) | |
+ //None | |
+ }) | |
+ Some(new ProducerResponse()) //TODO: | |
} | |
def handleFetchRequest(request: Receive): Option[Send] = { | |
Index: core/src/main/scala/kafka/api/ProducerRequest.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) | |
@@ -22,47 +22,112 @@ | |
import kafka.network._ | |
import kafka.utils._ | |
+object WiredFormat { | |
+ val version_id: Short = 0 | |
+} | |
+ | |
+class WiredTopic(val topic: String, val partition_data: Array[WiredPartition]) | |
+ | |
+class WiredPartition(val partition: Int, val messages: ByteBufferMessageSet) { | |
+ def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = { | |
+ if (partition == ProducerRequest.RandomPartition) | |
+ return randomSelector(topic) | |
+ else | |
+ return partition | |
+ } | |
+} | |
+ | |
object ProducerRequest { | |
val RandomPartition = -1 | |
- | |
+ | |
def readFrom(buffer: ByteBuffer): ProducerRequest = { | |
- val topic = Utils.readShortString(buffer, "UTF-8") | |
- val partition = buffer.getInt | |
- val messageSetSize = buffer.getInt | |
- val messageSetBuffer = buffer.slice() | |
- messageSetBuffer.limit(messageSetSize) | |
- buffer.position(buffer.position + messageSetSize) | |
- new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer)) | |
+ val version_id: Short = buffer.getShort | |
+ val correlation_id: Int = buffer.getInt | |
+ val client_id: String = Utils.readShortString(buffer, "UTF-8") | |
+ val required_acks: Short = buffer.getShort | |
+ val ack_timeout: Int = buffer.getInt | |
+ //build the topic structure | |
+ val topicCount = buffer.getInt | |
+ val data = new Array[WiredTopic](topicCount) | |
+ for(i <- 0 until topicCount) { | |
+ val topic = Utils.readShortString(buffer, "UTF-8") | |
+ | |
+ val partitionCount = buffer.getInt | |
+ //build the partition structure within this topic | |
+ val partition_data = new Array[WiredPartition](partitionCount) | |
+ for (j <- 0 until partitionCount) { | |
+ val partition = buffer.getInt | |
+ val messageSetSize = buffer.getInt | |
+ val messageSetBuffer = new Array[Byte](messageSetSize) | |
+ buffer.get(messageSetBuffer,0,buffer.position) | |
+ partition_data(j) = new WiredPartition(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) | |
+ } | |
+ data(i) = new WiredTopic(topic,partition_data) | |
+ } | |
+ new ProducerRequest(correlation_id,client_id,required_acks,ack_timeout,data) | |
} | |
} | |
-class ProducerRequest(val topic: String, | |
- val partition: Int, | |
- val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) { | |
+class ProducerRequest(val correlation_id: Int, | |
+ val client_id: String, | |
+ val required_acks: Short, | |
+ val ack_timeout: Int, | |
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { | |
+ val version_id: Short = WiredFormat.version_id | |
+ | |
def writeTo(buffer: ByteBuffer) { | |
- Utils.writeShortString(buffer, topic, "UTF-8") | |
- buffer.putInt(partition) | |
- buffer.putInt(messages.serialized.limit) | |
- buffer.put(messages.serialized) | |
- messages.serialized.rewind | |
+ buffer.putShort(version_id) | |
+ buffer.putInt(correlation_id) | |
+ Utils.writeShortString(buffer, client_id, "UTF-8") | |
+ buffer.putShort(required_acks) | |
+ buffer.putInt(ack_timeout) | |
+ //save the topic structure | |
+ buffer.putInt(data.size) //the number of topics | |
+ data.foreach(d =>{ | |
+ Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic | |
+ buffer.putInt(d.partition_data.size) //the number of partitions | |
+ d.partition_data.foreach(p => { | |
+ buffer.putInt(p.partition) | |
+ buffer.putInt(p.messages.serialized.limit) | |
+ buffer.put(p.messages.serialized) | |
+ p.messages.serialized.rewind | |
+ }) | |
+ }) | |
} | |
- | |
- def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int] | |
- def getTranslatedPartition(randomSelector: String => Int): Int = { | |
- if (partition == ProducerRequest.RandomPartition) | |
- return randomSelector(topic) | |
- else | |
- return partition | |
+ def sizeInBytes(): Int = { | |
+ var size = 0 | |
+ //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size | |
+ size = 2 + 4 + 2 + client_id.length + 2 + 4 + 4; | |
+ data.foreach(d =>{ | |
+ size += 2 + d.topic.length | |
+ d.partition_data.foreach(p => { | |
+ size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] | |
+ }) | |
+ }) | |
+ size | |
} | |
+ | |
override def toString: String = { | |
val builder = new StringBuilder() | |
builder.append("ProducerRequest(") | |
- builder.append(topic + ",") | |
- builder.append(partition + ",") | |
- builder.append(messages.sizeInBytes) | |
+ builder.append(version_id + ",") | |
+ builder.append(correlation_id + ",") | |
+ builder.append(client_id + ",") | |
+ builder.append(required_acks + ",") | |
+ builder.append(ack_timeout) | |
+ data.foreach(d =>{ | |
+ builder.append(":[" + d.topic) | |
+ d.partition_data.foreach(p => { | |
+ builder.append(":[") | |
+ builder.append(p.partition + ",") | |
+ builder.append(p.messages.sizeInBytes) | |
+ builder.append("]") | |
+ }) | |
+ builder.append("]") | |
+ }) | |
builder.append(")") | |
builder.toString | |
} | |
@@ -70,14 +135,36 @@ | |
override def equals(other: Any): Boolean = { | |
other match { | |
case that: ProducerRequest => | |
- (that canEqual this) && topic == that.topic && partition == that.partition && | |
- messages.equals(that.messages) | |
+ if (that canEqual this) | |
+ if (version_id == that.version_id && correlation_id == that.correlation_id && | |
+ client_id == that.client_id && required_acks == that.required_acks && ack_timeout == that.ack_timeout) { | |
+ for(i <- 0 until data.size) { | |
+ if (data(i).topic != that.data(i).topic) | |
+ return false | |
+ for(j <- 0 until data(i).partition_data.size) | |
+ if (data(i).partition_data(j).partition != that.data(i).partition_data(j).partition || !data(i).partition_data(j).messages.equals(that.data(i).partition_data(j).messages)) | |
+ return false | |
+ } | |
+ true | |
+ } | |
+ false | |
case _ => false | |
} | |
} | |
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] | |
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode | |
- | |
+ override def hashCode: Int = { | |
+ def hcp(num: Int): Int = { | |
+ 31 + (17 * num) | |
+ } | |
+ var hash = hcp(version_id) + hcp(correlation_id) + client_id.hashCode + hcp(required_acks) + hcp(ack_timeout) | |
+ data.foreach(d =>{ | |
+ hash += d.topic.hashCode | |
+ d.partition_data.foreach(p => { | |
+ hash += hcp(p.partition) + p.messages.hashCode | |
+ }) | |
+ }) | |
+ hash | |
+ } | |
} | |
Index: core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (working copy) | |
@@ -30,18 +30,17 @@ | |
underlying.send(topic, partition, messages) | |
} | |
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, | |
- kafka.api.ProducerRequest.RandomPartition, | |
- messages) | |
+ def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, kafka.api.ProducerRequest.RandomPartition, messages) | |
+/*TODO: delete | |
def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { | |
import kafka.javaapi.Implicits._ | |
val produceRequests = new Array[kafka.api.ProducerRequest](produces.length) | |
for(i <- 0 until produces.length) | |
- produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages) | |
+ produceRequests(i) = produces(i).api() | |
underlying.multiSend(produceRequests) | |
} | |
- | |
+*/ | |
def close() { | |
underlying.close | |
} | |
Index: core/src/main/scala/kafka/javaapi/producer/Producer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/javaapi/producer/Producer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/javaapi/producer/Producer.scala (working copy) | |
@@ -99,8 +99,11 @@ | |
* synchronous or the asynchronous producer | |
* @param producerData the producer data object that encapsulates the topic, key and message data | |
*/ | |
+ var correlation_id: Int = 0 | |
+ | |
def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) { | |
import collection.JavaConversions._ | |
+ underlying.correlation_id = correlation_id | |
underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey, | |
asBuffer(producerData.getData))) | |
} | |
@@ -111,6 +114,7 @@ | |
*/ | |
def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) { | |
import collection.JavaConversions._ | |
+ underlying.correlation_id = correlation_id | |
underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey, | |
asBuffer(pd.getData))): _*) | |
} | |
Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy) | |
@@ -17,36 +17,31 @@ | |
package kafka.javaapi | |
import kafka.network.Request | |
-import kafka.api.RequestKeys | |
+import kafka.api.{RequestKeys, WiredTopic} | |
import java.nio.ByteBuffer | |
-class ProducerRequest(val topic: String, | |
- val partition: Int, | |
- val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) { | |
+class ProducerRequest(val correlation_id: Int, | |
+ val client_id: String, | |
+ val required_acks: Short, | |
+ val ack_timeout: Int, | |
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { | |
+ | |
import Implicits._ | |
- private val underlying = new kafka.api.ProducerRequest(topic, partition, messages) | |
+ private val underlying = new kafka.api.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) | |
+ def api(): kafka.api.ProducerRequest = underlying | |
+ | |
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } | |
def sizeInBytes(): Int = underlying.sizeInBytes | |
- def getTranslatedPartition(randomSelector: String => Int): Int = | |
- underlying.getTranslatedPartition(randomSelector) | |
- | |
override def toString: String = | |
underlying.toString | |
- override def equals(other: Any): Boolean = { | |
- other match { | |
- case that: ProducerRequest => | |
- (that canEqual this) && topic == that.topic && partition == that.partition && | |
- messages.equals(that.messages) | |
- case _ => false | |
- } | |
- } | |
+ override def equals(other: Any): Boolean = underlying.equals(other) | |
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] | |
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode | |
+ override def hashCode: Int = underlying.hashCode | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment