Created
February 9, 2012 09:30
-
-
Save joestein/1778740 to your computer and use it in GitHub Desktop.
failing test kafka240
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) | |
@@ -29,6 +29,7 @@ | |
import kafka.message._ | |
import org.I0Itec.zkclient.ZkClient | |
import kafka.consumer.ConsumerConfig | |
+import kafka.api.{WiredTopic, WiredPartition} | |
/** | |
* Utility functions to help with testing | |
@@ -301,6 +302,24 @@ | |
} | |
} | |
+ /** | |
+ * Create a wired format request based on simple basic information | |
+ */ | |
+ def produceWiredRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { | |
+ produceWiredRequest(-1,topic,partition,message) | |
+ } | |
+ | |
+ def produceWiredRequest(correlation_id: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ var data = new Array[WiredTopic](1) | |
+ var partition_data = new Array[WiredPartition](1) | |
+ partition_data(0) = new WiredPartition(partition,message) | |
+ data(0) = new WiredTopic(topic,partition_data) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) | |
+ producerRequest | |
+ } | |
} | |
object TestZKUtils { | |
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala (working copy) | |
@@ -31,6 +31,8 @@ | |
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { | |
+ var correlation_id: Int = -1 | |
+ | |
val numNodes = 2 | |
val configs = | |
for(props <- TestUtils.createBrokerConfigs(numNodes)) | |
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) | |
@@ -19,7 +19,7 @@ | |
import scala.collection._ | |
import junit.framework.Assert._ | |
-import kafka.api.{ProducerRequest, FetchRequest} | |
+import kafka.api.{ProducerRequest, FetchRequest, WiredTopic, WiredPartition} | |
import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException} | |
import kafka.server.{KafkaRequestHandlers, KafkaConfig} | |
import org.apache.log4j.{Level, Logger} | |
@@ -216,16 +216,25 @@ | |
val topics = List("test1", "test2", "test3"); | |
val messages = new mutable.HashMap[String, ByteBufferMessageSet] | |
val fetches = new mutable.ArrayBuffer[FetchRequest] | |
- var produceList: List[ProducerRequest] = Nil | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ var data = new Array[WiredTopic](3) | |
+ var index = 0 | |
for(topic <- topics) { | |
val set = new ByteBufferMessageSet(NoCompressionCodec, | |
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) | |
messages += topic -> set | |
- produceList ::= new ProducerRequest(topic, 0, set) | |
+ var partition_data = new Array[WiredPartition](1) | |
+ partition_data(0) = new WiredPartition(0,set) | |
+ data(index) = new WiredTopic(topic,partition_data) | |
+ index += 1 | |
fetches += new FetchRequest(topic, 0, 0, 10000) | |
} | |
- producer.multiSend(produceList.toArray) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) | |
+ producer.send(producerRequest) | |
+ | |
for (messageSet <- messages.values) | |
messageSet.getBuffer.rewind | |
@@ -241,16 +250,25 @@ | |
val topics = List("test1", "test2", "test3"); | |
val messages = new mutable.HashMap[String, ByteBufferMessageSet] | |
val fetches = new mutable.ArrayBuffer[FetchRequest] | |
- var produceList: List[ProducerRequest] = Nil | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ var data = new Array[WiredTopic](3) | |
+ var index = 0 | |
for(topic <- topics) { | |
val set = new ByteBufferMessageSet(DefaultCompressionCodec, | |
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) | |
messages += topic -> set | |
- produceList ::= new ProducerRequest(topic, 0, set) | |
+ var partition_data = new Array[WiredPartition](1) | |
+ partition_data(0) = new WiredPartition(0,set) | |
+ data(index) = new WiredTopic(topic,partition_data) | |
+ index += 1 | |
fetches += new FetchRequest(topic, 0, 0, 10000) | |
} | |
- producer.multiSend(produceList.toArray) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) | |
+ producer.send(producerRequest) | |
+ | |
for (messageSet <- messages.values) | |
messageSet.getBuffer.rewind | |
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (working copy) | |
@@ -20,7 +20,7 @@ | |
import scala.collection._ | |
import junit.framework.Assert._ | |
import kafka.common.OffsetOutOfRangeException | |
-import kafka.api.{ProducerRequest, FetchRequest} | |
+import kafka.api.{ProducerRequest, FetchRequest, WiredTopic, WiredPartition} | |
import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig} | |
import org.apache.log4j.{Level, Logger} | |
import org.scalatest.junit.JUnit3Suite | |
@@ -132,16 +132,25 @@ | |
val topics = List("test1", "test2", "test3"); | |
val messages = new mutable.HashMap[String, ByteBufferMessageSet] | |
val fetches = new mutable.ArrayBuffer[FetchRequest] | |
- var produceList: List[ProducerRequest] = Nil | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ var data = new Array[WiredTopic](3) | |
+ var index = 0 | |
for(topic <- topics) { | |
val set = new ByteBufferMessageSet(NoCompressionCodec, | |
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) | |
messages += topic -> set | |
- produceList ::= new ProducerRequest(topic, 0, set) | |
+ var partition_data = new Array[WiredPartition](1) | |
+ partition_data(0) = new WiredPartition(0,set) | |
+ data(index) = new WiredTopic(topic,partition_data) | |
+ index += 1 | |
fetches += new FetchRequest(topic, 0, 0, 10000) | |
} | |
- producer.multiSend(produceList.toArray) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) | |
+ producer.send(producerRequest) | |
+ | |
for (messageSet <- messages.values) | |
messageSet.getBuffer.rewind | |
@@ -157,18 +166,27 @@ | |
val topics = List("test1", "test2", "test3"); | |
val messages = new mutable.HashMap[String, ByteBufferMessageSet] | |
val fetches = new mutable.ArrayBuffer[FetchRequest] | |
- var produceList: List[ProducerRequest] = Nil | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ var data = new Array[WiredTopic](3) | |
+ var index = 0 | |
for(topic <- topics) { | |
val set = new ByteBufferMessageSet(NoCompressionCodec, | |
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) | |
messages += topic -> set | |
- produceList ::= new ProducerRequest(topic, 0, set) | |
+ var partition_data = new Array[WiredPartition](1) | |
+ partition_data(0) = new WiredPartition(0,set) | |
+ data(index) = new WiredTopic(topic,partition_data) | |
+ index += 1 | |
fetches += new FetchRequest(topic, 0, 0, 10000) | |
} | |
- producer.multiSend(produceList.toArray) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) | |
+ producer.send(producerRequest) | |
+ | |
// resend the same multisend | |
- producer.multiSend(produceList.toArray) | |
+ producer.send(producerRequest) | |
for (messageSet <- messages.values) | |
messageSet.getBuffer.rewind | |
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) | |
@@ -77,11 +77,6 @@ | |
val secondEnd = SystemTime.milliseconds | |
Assert.assertTrue((secondEnd-secondStart) < 500) | |
- try { | |
- producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))) | |
- }catch { | |
- case e: Exception => failed=true | |
- } | |
Assert.assertFalse(failed) | |
} | |
Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) | |
@@ -27,6 +27,7 @@ | |
import kafka.producer.async._ | |
import kafka.serializer.Encoder | |
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} | |
+import kafka.utils.TestUtils | |
class AsyncProducerTest extends JUnitSuite { | |
@@ -42,8 +43,8 @@ | |
@Test | |
def testProducerQueueSize() { | |
val basicProducer = EasyMock.createMock(classOf[SyncProducer]) | |
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, | |
- getMessageSetOfSize(List(message1), 10))))) | |
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition, | |
+ getMessageSetOfSize(List(message1), 10))) | |
EasyMock.expectLastCall | |
basicProducer.close | |
EasyMock.expectLastCall | |
@@ -80,8 +81,8 @@ | |
@Test | |
def testAddAfterQueueClosed() { | |
val basicProducer = EasyMock.createMock(classOf[SyncProducer]) | |
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, | |
- getMessageSetOfSize(List(message1), 10))))) | |
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition, | |
+ getMessageSetOfSize(List(message1), 10))) | |
EasyMock.expectLastCall | |
basicProducer.close | |
EasyMock.expectLastCall | |
@@ -114,11 +115,11 @@ | |
@Test | |
def testBatchSize() { | |
val basicProducer = EasyMock.createStrictMock(classOf[SyncProducer]) | |
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, | |
- getMessageSetOfSize(List(message1), 5))))) | |
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition, | |
+ getMessageSetOfSize(List(message1), 5))) | |
EasyMock.expectLastCall.times(2) | |
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, | |
- getMessageSetOfSize(List(message1), 1))))) | |
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition, | |
+ getMessageSetOfSize(List(message1), 1))) | |
EasyMock.expectLastCall | |
basicProducer.close | |
EasyMock.expectLastCall | |
@@ -155,8 +156,8 @@ | |
@Test | |
def testQueueTimeExpired() { | |
val basicProducer = EasyMock.createMock(classOf[SyncProducer]) | |
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, | |
- getMessageSetOfSize(List(message1), 3))))) | |
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition, | |
+ getMessageSetOfSize(List(message1), 3))) | |
EasyMock.expectLastCall | |
basicProducer.close | |
EasyMock.expectLastCall | |
@@ -211,10 +212,11 @@ | |
@Test | |
def testCollateEvents() { | |
val basicProducer = EasyMock.createMock(classOf[SyncProducer]) | |
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, ProducerRequest.RandomPartition, | |
+/*TODO: KAFKA-240 basicProducer.send(TestUtils.produceWiredRequest(topic2, ProducerRequest.RandomPartition, | |
getMessageSetOfSize(List(message2), 5)), | |
new ProducerRequest(topic1, ProducerRequest.RandomPartition, | |
getMessageSetOfSize(List(message1), 5))))) | |
+*/ | |
EasyMock.expectLastCall | |
basicProducer.close | |
EasyMock.expectLastCall | |
@@ -246,7 +248,7 @@ | |
@Test | |
def testCollateAndSerializeEvents() { | |
val basicProducer = EasyMock.createMock(classOf[SyncProducer]) | |
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1, | |
+/*TODO: KAFKA-240 basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1, | |
getMessageSetOfSize(List(message2), 5)), | |
new ProducerRequest(topic1, 0, | |
getMessageSetOfSize(List(message1), 5)), | |
@@ -254,7 +256,7 @@ | |
getMessageSetOfSize(List(message1), 5)), | |
new ProducerRequest(topic2, 0, | |
getMessageSetOfSize(List(message2), 5))))) | |
- | |
+*/ | |
EasyMock.expectLastCall | |
basicProducer.close | |
EasyMock.expectLastCall | |
@@ -304,8 +306,5 @@ | |
override def send(topic: String, messages: ByteBufferMessageSet): Unit = { | |
Thread.sleep(1000) | |
} | |
- override def multiSend(produces: Array[ProducerRequest]) { | |
- Thread.sleep(1000) | |
- } | |
} | |
} | |
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) | |
@@ -119,7 +119,7 @@ | |
val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) | |
val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) | |
// it should send to partition 0 (first partition) on second broker i.e broker2 | |
- syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes))) | |
+ syncProducer2.send(TestUtils.produceWiredRequest(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))) | |
EasyMock.expectLastCall | |
syncProducer1.close | |
EasyMock.expectLastCall | |
@@ -156,7 +156,7 @@ | |
val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() | |
val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) | |
// it should send to a random partition due to use of broker.list | |
- syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) | |
+ syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes()))) | |
EasyMock.expectLastCall | |
syncProducer1.close | |
EasyMock.expectLastCall | |
@@ -241,7 +241,8 @@ | |
props.put("serializer.class", "kafka.producer.StringSerializer") | |
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, | |
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) | |
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) | |
+ val correlation_id = -1 | |
+ producerPool.send(correlation_id, producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) | |
producerPool.close | |
EasyMock.verify(syncProducer1) | |
@@ -273,7 +274,8 @@ | |
props.put("producer.type", "async") | |
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, | |
new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) | |
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) | |
+ val correlation_id = -1 | |
+ producerPool.send(correlation_id, producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) | |
producerPool.close | |
EasyMock.verify(asyncProducer1) | |
@@ -299,7 +301,8 @@ | |
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, | |
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) | |
try { | |
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) | |
+ val correlation_id = -1 | |
+ producerPool.send(correlation_id, producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) | |
Assert.fail("Should fail with UnavailableProducerException") | |
}catch { | |
case e: UnavailableProducerException => // expected | |
@@ -330,7 +333,8 @@ | |
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, | |
new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) | |
try { | |
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) | |
+ val correlation_id = -1 | |
+ producerPool.send(correlation_id, producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) | |
Assert.fail("Should fail with UnavailableProducerException") | |
}catch { | |
case e: UnavailableProducerException => // expected | |
Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) | |
@@ -31,6 +31,8 @@ | |
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { | |
+ var correlation_id: Int = -1 | |
+ | |
val zookeeperConnect = TestZKUtils.zookeeperConnect | |
val zkConnect = zookeeperConnect | |
val numNodes = 2 | |
@@ -256,7 +258,7 @@ | |
val mSet = new ByteBufferMessageSet(compressionCodec = compression, messages = ms: _*) | |
for (message <- ms) | |
messages ::= message | |
- producer.send(topic, partition, mSet) | |
+ producer.send(correlation_id, topic, partition, mSet) | |
} | |
producer.close() | |
messages | |
Index: core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (working copy) | |
@@ -27,6 +27,8 @@ | |
import kafka.javaapi.ProducerRequest | |
import kafka.utils.TestUtils | |
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message} | |
+import kafka.api.{WiredTopic, WiredPartition} | |
+import kafka.javaapi.Implicits._ | |
/** | |
* End to end tests of the primitive apis against a local server | |
@@ -342,16 +344,24 @@ | |
val topics = List("test1", "test2", "test3"); | |
val messages = new mutable.HashMap[String, ByteBufferMessageSet] | |
val fetches = new mutable.ArrayBuffer[FetchRequest] | |
- var produceList: List[ProducerRequest] = Nil | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ var data = new Array[WiredTopic](3) | |
+ var index = 0 | |
for(topic <- topics) { | |
val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, | |
messages = getMessageList(new Message(("a_" + topic).getBytes), | |
new Message(("b_" + topic).getBytes))) | |
messages += topic -> set | |
- produceList ::= new ProducerRequest(topic, 0, set) | |
+ var partition_data = new Array[WiredPartition](1) | |
+ partition_data(0) = new WiredPartition(0,set) | |
+ data(index) = new WiredTopic(topic,partition_data) | |
+ index += 1 | |
fetches += new FetchRequest(topic, 0, 0, 10000) | |
} | |
- producer.multiSend(produceList.toArray) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) | |
+ producer.send(producerRequest) | |
for (messageSet <- messages.values) | |
messageSet.getBuffer.rewind | |
@@ -375,16 +385,24 @@ | |
val topics = List("test1", "test2", "test3"); | |
val messages = new mutable.HashMap[String, ByteBufferMessageSet] | |
val fetches = new mutable.ArrayBuffer[FetchRequest] | |
- var produceList: List[ProducerRequest] = Nil | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ var data = new Array[WiredTopic](3) | |
+ var index = 0 | |
for(topic <- topics) { | |
val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, | |
messages = getMessageList(new Message(("a_" + topic).getBytes), | |
new Message(("b_" + topic).getBytes))) | |
messages += topic -> set | |
- produceList ::= new ProducerRequest(topic, 0, set) | |
+ var partition_data = new Array[WiredPartition](1) | |
+ partition_data(0) = new WiredPartition(0,set) | |
+ data(index) = new WiredTopic(topic,partition_data) | |
+ index += 1 | |
fetches += new FetchRequest(topic, 0, 0, 10000) | |
} | |
- producer.multiSend(produceList.toArray) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) | |
+ producer.send(producerRequest) | |
for (messageSet <- messages.values) | |
messageSet.getBuffer.rewind | |
Index: core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (working copy) | |
@@ -80,13 +80,6 @@ | |
val secondEnd = SystemTime.milliseconds | |
Assert.assertTrue((secondEnd-secondEnd) < 500) | |
- try { | |
- producer.multiSend(Array(new ProducerRequest("test", 0, | |
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, | |
- messages = getMessageList(new Message(messageBytes)))))) | |
- }catch { | |
- case e: Exception => failed=true | |
- } | |
Assert.assertFalse(failed) | |
} | |
Index: core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala | |
=================================================================== | |
--- core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (revision 1234442) | |
+++ core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (working copy) | |
@@ -130,7 +130,7 @@ | |
// it should send to partition 0 (first partition) on second broker i.e broker2 | |
val messageList = new java.util.ArrayList[Message] | |
messageList.add(new Message("test1".getBytes())) | |
- syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) | |
+ syncProducer2.send(TestUtils.produceWiredRequest(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))) | |
EasyMock.expectLastCall | |
syncProducer1.close | |
EasyMock.expectLastCall | |
@@ -171,8 +171,8 @@ | |
val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) | |
// it should send to a random partition due to use of broker.list | |
val messageList = new java.util.ArrayList[Message] | |
- messageList.add(new Message("t".getBytes())) | |
- syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) | |
+ messageList.add(new Message("test1".getBytes())) | |
+ syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) | |
EasyMock.expectLastCall | |
syncProducer1.close | |
EasyMock.expectLastCall | |
@@ -235,7 +235,8 @@ | |
props.put("serializer.class", "kafka.producer.StringSerializer") | |
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, | |
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) | |
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) | |
+ val correlation_id = -1 | |
+ producerPool.send(correlation_id, producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) | |
producerPool.close | |
EasyMock.verify(syncProducer1) | |
@@ -267,7 +268,8 @@ | |
props.put("producer.type", "async") | |
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, | |
new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers) | |
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) | |
+ val correlation_id = -1 | |
+ producerPool.send(correlation_id, producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) | |
producerPool.close | |
EasyMock.verify(asyncProducer1) | |
@@ -293,7 +295,8 @@ | |
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, | |
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) | |
try { | |
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) | |
+ val correlation_id = -1 | |
+ producerPool.send(correlation_id,producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) | |
Assert.fail("Should fail with UnavailableProducerException") | |
}catch { | |
case e: UnavailableProducerException => // expected | |
@@ -324,7 +327,8 @@ | |
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, | |
new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers) | |
try { | |
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) | |
+ val correlation_id = -1 | |
+ producerPool.send(correlation_id,producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) | |
Assert.fail("Should fail with UnavailableProducerException") | |
}catch { | |
case e: UnavailableProducerException => // expected | |
Index: core/src/main/scala/kafka/producer/ProducerPool.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/ProducerPool.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/ProducerPool.scala (working copy) | |
@@ -26,6 +26,8 @@ | |
import kafka.common.{UnavailableProducerException, InvalidConfigException} | |
import kafka.utils.{Utils, Logging} | |
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} | |
+import collection.mutable.{HashMap, ListBuffer} | |
+import kafka.api.{WiredTopic, WiredPartition} | |
class ProducerPool[V](private val config: ProducerConfig, | |
private val serializer: Encoder[V], | |
@@ -94,7 +96,7 @@ | |
* producer to publish the data to the specified broker partition | |
* @param poolData the producer pool request object | |
*/ | |
- def send(poolData: ProducerPoolData[V]*) { | |
+ def send(correlation_id: Int, poolData: ProducerPoolData[V]*) { | |
val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct | |
var remainingRequests = poolData.toSeq | |
distinctBrokers.foreach { bid => | |
@@ -102,18 +104,26 @@ | |
remainingRequests = requestsForThisBid._2 | |
if(sync) { | |
- val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId, | |
- new ByteBufferMessageSet(compressionCodec = config.compressionCodec, | |
- messages = req.getData.map(d => serializer.toMessage(d)): _*))) | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val topics = new HashMap[String, ListBuffer[WiredPartition]]() | |
+ requestsForThisBid._1.map(req => { | |
+ val topic = topics.get(req.getTopic) // checking to see if this topics exists | |
+ topic match { | |
+ case None => topics += req.getTopic -> new ListBuffer[WiredPartition]() //create a new listbuffer for this topic | |
+ case Some(x) => trace("found " + req.getTopic) | |
+ } | |
+ topics += req.getTopic -> new ListBuffer[WiredPartition]() | |
+ topics(req.getTopic).append(new WiredPartition(req.getBidPid.partId, new ByteBufferMessageSet(compressionCodec = config.compressionCodec, | |
+ messages = req.getData.map(d => serializer.toMessage(d)): _*))) | |
+ }) | |
+ val wired_topics = topics.map(kv => new WiredTopic(kv._1,kv._2.toArray)) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, wired_topics.toArray) | |
debug("Fetching sync producer for broker id: " + bid) | |
val producer = syncProducers.get(bid) | |
- if(producer != null) { | |
- if(producerRequests.size > 1) | |
- producer.multiSend(producerRequests.toArray) | |
- else | |
- producer.send(topic = producerRequests(0).topic, | |
- partition = producerRequests(0).partition, | |
- messages = producerRequests(0).messages) | |
+ if(producer != null) { | |
+ producer.send(producerRequest) | |
config.compressionCodec match { | |
case NoCompressionCodec => debug("Sending message to broker " + bid) | |
case _ => debug("Sending compressed messages to broker " + bid) | |
Index: core/src/main/scala/kafka/producer/SyncProducer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) | |
@@ -50,7 +50,8 @@ | |
if (logger.isTraceEnabled) { | |
trace("verifying sendbuffer of size " + buffer.limit) | |
val requestTypeId = buffer.getShort() | |
- if (requestTypeId == RequestKeys.MultiProduce) { | |
+ //TODO: KAFKA-240 need to understand purpose of this | |
+ /* if (requestTypeId == RequestKeys.MultiProduce) { | |
try { | |
val request = MultiProducerRequest.readFrom(buffer) | |
for (produce <- request.produces) { | |
@@ -69,7 +70,7 @@ | |
case e: Throwable => | |
trace("error verifying sendbuffer ", e) | |
} | |
- } | |
+ }*/ | |
} | |
} | |
@@ -107,15 +108,37 @@ | |
/** | |
* Send a message | |
*/ | |
+ def send(producerRequest: kafka.javaapi.ProducerRequest) { | |
+ producerRequest.data.foreach(d => { | |
+ d.partition_data.foreach(p => { | |
+ verifyMessageSize(p.messages) | |
+ val setSize = p.messages.sizeInBytes.asInstanceOf[Int] | |
+ trace("Got message set with " + setSize + " bytes to send") | |
+ }) | |
+ }) | |
+ send(new BoundedByteBufferSend(producerRequest)) | |
+ } | |
+ | |
def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { | |
- verifyMessageSize(messages) | |
- val setSize = messages.sizeInBytes.asInstanceOf[Int] | |
- trace("Got message set with " + setSize + " bytes to send") | |
- send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages))) | |
+ send(-1,topic,partition,messages) | |
} | |
- | |
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages) | |
+ | |
+ def send(topic: String, messages: ByteBufferMessageSet) { | |
+ send(-1,topic,ProducerRequest.RandomPartition,messages) | |
+ } | |
+ def send(correlation_id: Int, topic: String, partition: Int, messages: ByteBufferMessageSet) { | |
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from | |
+ var data = new Array[WiredTopic](1) | |
+ var partition_data = new Array[WiredPartition](1) | |
+ partition_data(0) = new WiredPartition(partition,messages) | |
+ data(0) = new WiredTopic(topic,partition_data) | |
+ val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) | |
+ send(producerRequest) | |
+ } | |
+/*TODO: delete | |
def multiSend(produces: Array[ProducerRequest]) { | |
for (request <- produces) | |
verifyMessageSize(request.messages) | |
@@ -123,7 +146,7 @@ | |
trace("Got multi message sets with " + setSize + " bytes to send") | |
send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) | |
} | |
- | |
+*/ | |
def close() = { | |
lock synchronized { | |
disconnect() | |
Index: core/src/main/scala/kafka/producer/Producer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/Producer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/Producer.scala (working copy) | |
@@ -93,6 +93,8 @@ | |
this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner, | |
new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null) | |
+ var correlation_id: Int = -1 //allows the client to send request specific information through the producer | |
+ | |
/** | |
* Sends the data, partitioned by key to the topic using either the | |
* synchronous or the asynchronous producer | |
@@ -138,7 +140,7 @@ | |
new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId), | |
pd.getData) | |
} | |
- producerPool.send(producerPoolRequests: _*) | |
+ producerPool.send(correlation_id, producerPoolRequests: _*) | |
} | |
private def configSend(producerData: ProducerData[K,V]*) { | |
@@ -160,7 +162,7 @@ | |
new Partition(brokerIdPartition.brokerId, partition), | |
pd.getData) | |
} | |
- producerPool.send(producerPoolRequests: _*) | |
+ producerPool.send(correlation_id, producerPoolRequests: _*) | |
} | |
private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = { | |
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) | |
@@ -46,8 +46,9 @@ | |
private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) { | |
if(messagesPerTopic.size > 0) { | |
- val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray | |
- syncProducer.multiSend(requests) | |
+ //TODO KAFKA-240 | |
+ //val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray | |
+ //syncProducer.multiSend(requests) | |
trace("kafka producer sent messages for topics %s to broker %s:%d" | |
.format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port)) | |
} | |
Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) | |
@@ -32,8 +32,6 @@ | |
*/ | |
private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Logging { | |
- private val requestLogger = Logger.getLogger("kafka.request.logger") | |
- | |
def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = { | |
requestTypeId match { | |
case RequestKeys.Produce => handleProducerRequest _ | |
@@ -48,55 +46,59 @@ | |
def handleProducerRequest(receive: Receive): Option[Send] = { | |
val sTime = SystemTime.milliseconds | |
val request = ProducerRequest.readFrom(receive.buffer) | |
- | |
- if(requestLogger.isTraceEnabled) | |
- requestLogger.trace("Producer request " + request.toString) | |
- handleProducerRequest(request, "ProduceRequest") | |
+ trace("Producer request " + request.toString) | |
debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms") | |
- None | |
+ Some(handleProducerRequest(request, "ProduceRequest")) | |
} | |
def handleMultiProducerRequest(receive: Receive): Option[Send] = { | |
val request = MultiProducerRequest.readFrom(receive.buffer) | |
- if(requestLogger.isTraceEnabled) | |
- requestLogger.trace("Multiproducer request " + request.toString) | |
+ trace("Multiproducer request " + request.toString) | |
request.produces.map(handleProducerRequest(_, "MultiProducerRequest")) | |
None | |
} | |
- private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = { | |
- val partition = request.getTranslatedPartition(logManager.chooseRandomPartition) | |
- try { | |
- logManager.getOrCreateLog(request.topic, partition).append(request.messages) | |
- trace(request.messages.sizeInBytes + " bytes written to logs.") | |
- request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) | |
- } | |
- catch { | |
- case e => | |
- error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) | |
- e match { | |
- case _: IOException => | |
- fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) | |
- Runtime.getRuntime.halt(1) | |
- case _ => | |
+ private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): ProducerResponse = { | |
+ val requestSize = request.data.size | |
+ val errors = new Array[Int](requestSize) | |
+ val offsets = new Array[Long](requestSize) | |
+ | |
+ request.data.foreach(d => { | |
+ d.partition_data.foreach(p => { | |
+ val partition = p.getTranslatedPartition(d.topic, logManager.chooseRandomPartition) | |
+ try { | |
+ logManager.getOrCreateLog(d.topic, partition).append(p.messages) | |
+ trace(p.messages.sizeInBytes + " bytes written to logs.") | |
+ p.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) | |
} | |
- throw e | |
- } | |
- None | |
+ catch { | |
+ case e => | |
+ //TODO: handle response in ProducerResponse | |
+ error("Error processing " + requestHandlerName + " on " + d.topic + ":" + partition, e) | |
+ e match { | |
+ case _: IOException => | |
+ fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) | |
+ Runtime.getRuntime.halt(1) | |
+ case _ => | |
+ } | |
+ //throw e | |
+ } | |
+ }) | |
+ //None | |
+ }) | |
+ new ProducerResponse(request.correlation_id, request.version_id, errors, offsets) | |
} | |
def handleFetchRequest(request: Receive): Option[Send] = { | |
val fetchRequest = FetchRequest.readFrom(request.buffer) | |
- if(requestLogger.isTraceEnabled) | |
- requestLogger.trace("Fetch request " + fetchRequest.toString) | |
+ trace("Fetch request " + fetchRequest.toString) | |
Some(readMessageSet(fetchRequest)) | |
} | |
def handleMultiFetchRequest(request: Receive): Option[Send] = { | |
val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer) | |
- if(requestLogger.isTraceEnabled) | |
- requestLogger.trace("Multifetch request") | |
- multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString)) | |
+ trace("Multifetch request") | |
+ multiFetchRequest.fetches.foreach(req => trace(req.toString)) | |
var responses = multiFetchRequest.fetches.map(fetch => | |
readMessageSet(fetch)).toList | |
@@ -123,8 +125,7 @@ | |
def handleOffsetRequest(request: Receive): Option[Send] = { | |
val offsetRequest = OffsetRequest.readFrom(request.buffer) | |
- if(requestLogger.isTraceEnabled) | |
- requestLogger.trace("Offset request " + offsetRequest.toString) | |
+ trace("Offset request " + offsetRequest.toString) | |
val offsets = logManager.getOffsets(offsetRequest) | |
val response = new OffsetArraySend(offsets) | |
Some(response) | |
Index: core/src/main/scala/kafka/api/ProducerRequest.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) | |
@@ -22,47 +22,112 @@ | |
import kafka.network._ | |
import kafka.utils._ | |
+object WiredFormat { | |
+ val version_id: Short = 0 | |
+} | |
+ | |
+class WiredTopic(val topic: String, val partition_data: Array[WiredPartition]) | |
+ | |
+class WiredPartition(val partition: Int, val messages: ByteBufferMessageSet) { | |
+ def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = { | |
+ if (partition == ProducerRequest.RandomPartition) | |
+ return randomSelector(topic) | |
+ else | |
+ return partition | |
+ } | |
+} | |
+ | |
object ProducerRequest { | |
val RandomPartition = -1 | |
- | |
+ | |
def readFrom(buffer: ByteBuffer): ProducerRequest = { | |
- val topic = Utils.readShortString(buffer, "UTF-8") | |
- val partition = buffer.getInt | |
- val messageSetSize = buffer.getInt | |
- val messageSetBuffer = buffer.slice() | |
- messageSetBuffer.limit(messageSetSize) | |
- buffer.position(buffer.position + messageSetSize) | |
- new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer)) | |
+ val version_id: Short = buffer.getShort | |
+ val correlation_id: Int = buffer.getInt | |
+ val client_id: String = Utils.readShortString(buffer, "UTF-8") | |
+ val required_acks: Short = buffer.getShort | |
+ val ack_timeout: Int = buffer.getInt | |
+ //build the topic structure | |
+ val topicCount = buffer.getInt | |
+ val data = new Array[WiredTopic](topicCount) | |
+ for(i <- 0 until topicCount) { | |
+ val topic = Utils.readShortString(buffer, "UTF-8") | |
+ | |
+ val partitionCount = buffer.getInt | |
+ //build the partition structure within this topic | |
+ val partition_data = new Array[WiredPartition](partitionCount) | |
+ for (j <- 0 until partitionCount) { | |
+ val partition = buffer.getInt | |
+ val messageSetSize = buffer.getInt | |
+ val messageSetBuffer = new Array[Byte](messageSetSize) | |
+ buffer.get(messageSetBuffer,0,messageSetSize) | |
+ partition_data(j) = new WiredPartition(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) | |
+ } | |
+ data(i) = new WiredTopic(topic,partition_data) | |
+ } | |
+ new ProducerRequest(correlation_id,client_id,required_acks,ack_timeout,data) | |
} | |
} | |
-class ProducerRequest(val topic: String, | |
- val partition: Int, | |
- val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) { | |
+class ProducerRequest(val correlation_id: Int, | |
+ val client_id: String, | |
+ val required_acks: Short, | |
+ val ack_timeout: Int, | |
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { | |
+ val version_id: Short = WiredFormat.version_id | |
+ | |
def writeTo(buffer: ByteBuffer) { | |
- Utils.writeShortString(buffer, topic, "UTF-8") | |
- buffer.putInt(partition) | |
- buffer.putInt(messages.serialized.limit) | |
- buffer.put(messages.serialized) | |
- messages.serialized.rewind | |
+ buffer.putShort(version_id) | |
+ buffer.putInt(correlation_id) | |
+ Utils.writeShortString(buffer, client_id, "UTF-8") | |
+ buffer.putShort(required_acks) | |
+ buffer.putInt(ack_timeout) | |
+ //save the topic structure | |
+ buffer.putInt(data.size) //the number of topics | |
+ data.foreach(d =>{ | |
+ Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic | |
+ buffer.putInt(d.partition_data.size) //the number of partitions | |
+ d.partition_data.foreach(p => { | |
+ buffer.putInt(p.partition) | |
+ buffer.putInt(p.messages.serialized.limit) | |
+ buffer.put(p.messages.serialized) | |
+ p.messages.serialized.rewind | |
+ }) | |
+ }) | |
} | |
- | |
- def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int] | |
- def getTranslatedPartition(randomSelector: String => Int): Int = { | |
- if (partition == ProducerRequest.RandomPartition) | |
- return randomSelector(topic) | |
- else | |
- return partition | |
+ def sizeInBytes(): Int = { | |
+ var size = 0 | |
+ //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size | |
+ size = 2 + 4 + 2 + client_id.length + 2 + 4 + 4; | |
+ data.foreach(d =>{ | |
+ size += 2 + d.topic.length + 4 | |
+ d.partition_data.foreach(p => { | |
+ size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] | |
+ }) | |
+ }) | |
+ size | |
} | |
+ | |
override def toString: String = { | |
val builder = new StringBuilder() | |
builder.append("ProducerRequest(") | |
- builder.append(topic + ",") | |
- builder.append(partition + ",") | |
- builder.append(messages.sizeInBytes) | |
+ builder.append(version_id + ",") | |
+ builder.append(correlation_id + ",") | |
+ builder.append(client_id + ",") | |
+ builder.append(required_acks + ",") | |
+ builder.append(ack_timeout) | |
+ data.foreach(d =>{ | |
+ builder.append(":[" + d.topic) | |
+ d.partition_data.foreach(p => { | |
+ builder.append(":[") | |
+ builder.append(p.partition + ",") | |
+ builder.append(p.messages.sizeInBytes) | |
+ builder.append("]") | |
+ }) | |
+ builder.append("]") | |
+ }) | |
builder.append(")") | |
builder.toString | |
} | |
@@ -70,14 +135,36 @@ | |
override def equals(other: Any): Boolean = { | |
other match { | |
case that: ProducerRequest => | |
- (that canEqual this) && topic == that.topic && partition == that.partition && | |
- messages.equals(that.messages) | |
+ if (that canEqual this) | |
+ if (version_id == that.version_id && correlation_id == that.correlation_id && | |
+ client_id == that.client_id && required_acks == that.required_acks && ack_timeout == that.ack_timeout) { | |
+ for(i <- 0 until data.size) { | |
+ if (data(i).topic != that.data(i).topic) | |
+ return false | |
+ for(j <- 0 until data(i).partition_data.size) | |
+ if (data(i).partition_data(j).partition != that.data(i).partition_data(j).partition || !data(i).partition_data(j).messages.equals(that.data(i).partition_data(j).messages)) | |
+ return false | |
+ } | |
+ true | |
+ } | |
+ false | |
case _ => false | |
} | |
} | |
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] | |
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode | |
- | |
+ override def hashCode: Int = { | |
+ def hcp(num: Int): Int = { | |
+ 31 + (17 * num) | |
+ } | |
+ var hash = hcp(version_id) + hcp(correlation_id) + client_id.hashCode + hcp(required_acks) + hcp(ack_timeout) | |
+ data.foreach(d =>{ | |
+ hash += d.topic.hashCode | |
+ d.partition_data.foreach(p => { | |
+ hash += hcp(p.partition) + p.messages.hashCode | |
+ }) | |
+ }) | |
+ hash | |
+ } | |
} | |
Index: core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (working copy) | |
@@ -30,18 +30,17 @@ | |
underlying.send(topic, partition, messages) | |
} | |
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, | |
- kafka.api.ProducerRequest.RandomPartition, | |
- messages) | |
+ def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, kafka.api.ProducerRequest.RandomPartition, messages) | |
+/*TODO: delete | |
def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { | |
import kafka.javaapi.Implicits._ | |
val produceRequests = new Array[kafka.api.ProducerRequest](produces.length) | |
for(i <- 0 until produces.length) | |
- produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages) | |
+ produceRequests(i) = produces(i).api() | |
underlying.multiSend(produceRequests) | |
} | |
- | |
+*/ | |
def close() { | |
underlying.close | |
} | |
Index: core/src/main/scala/kafka/javaapi/producer/Producer.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/javaapi/producer/Producer.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/javaapi/producer/Producer.scala (working copy) | |
@@ -99,8 +99,11 @@ | |
* synchronous or the asynchronous producer | |
* @param producerData the producer data object that encapsulates the topic, key and message data | |
*/ | |
+ var correlation_id: Int = -1 | |
+ | |
def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) { | |
import collection.JavaConversions._ | |
+ underlying.correlation_id = correlation_id | |
underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey, | |
asBuffer(producerData.getData))) | |
} | |
@@ -111,6 +114,7 @@ | |
*/ | |
def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) { | |
import collection.JavaConversions._ | |
+ underlying.correlation_id = correlation_id | |
underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey, | |
asBuffer(pd.getData))): _*) | |
} | |
Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala | |
=================================================================== | |
--- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1234442) | |
+++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy) | |
@@ -17,36 +17,31 @@ | |
package kafka.javaapi | |
import kafka.network.Request | |
-import kafka.api.RequestKeys | |
+import kafka.api.{RequestKeys, WiredTopic} | |
import java.nio.ByteBuffer | |
-class ProducerRequest(val topic: String, | |
- val partition: Int, | |
- val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) { | |
+class ProducerRequest(val correlation_id: Int, | |
+ val client_id: String, | |
+ val required_acks: Short, | |
+ val ack_timeout: Int, | |
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) { | |
+ | |
import Implicits._ | |
- private val underlying = new kafka.api.ProducerRequest(topic, partition, messages) | |
+ private val underlying = new kafka.api.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) | |
+ def api(): kafka.api.ProducerRequest = underlying | |
+ | |
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } | |
def sizeInBytes(): Int = underlying.sizeInBytes | |
- def getTranslatedPartition(randomSelector: String => Int): Int = | |
- underlying.getTranslatedPartition(randomSelector) | |
- | |
override def toString: String = | |
underlying.toString | |
- override def equals(other: Any): Boolean = { | |
- other match { | |
- case that: ProducerRequest => | |
- (that canEqual this) && topic == that.topic && partition == that.partition && | |
- messages.equals(that.messages) | |
- case _ => false | |
- } | |
- } | |
+ override def equals(other: Any): Boolean = underlying.equals(other) | |
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] | |
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode | |
+ override def hashCode: Int = underlying.hashCode | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment