Skip to content

Instantly share code, notes, and snippets.

@joestein
Created February 5, 2012 21:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joestein/1748005 to your computer and use it in GitHub Desktop.
Save joestein/1748005 to your computer and use it in GitHub Desktop.
kafka-240 latest
Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy)
@@ -29,6 +29,7 @@
import kafka.message._
import org.I0Itec.zkclient.ZkClient
import kafka.consumer.ConsumerConfig
+import kafka.api.{WiredTopic, WiredPartition}
/**
* Utility functions to help with testing
@@ -301,6 +302,24 @@
}
}
+ /**
+ * Create a wired format request based on simple basic information
+ */
+ def produceWiredRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
+ produceWiredRequest(-1,topic,partition,message)
+ }
+
+ def produceWiredRequest(correlation_id: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ var data = new Array[WiredTopic](1)
+ var partition_data = new Array[WiredPartition](1)
+ partition_data(0) = new WiredPartition(partition,message)
+ data(0) = new WiredTopic(topic,partition_data)
+ val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data)
+ producerRequest
+ }
}
object TestZKUtils {
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy)
@@ -19,7 +19,7 @@
import scala.collection._
import junit.framework.Assert._
-import kafka.api.{ProducerRequest, FetchRequest}
+import kafka.api.{ProducerRequest, FetchRequest, WiredTopic, WiredPartition}
import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException}
import kafka.server.{KafkaRequestHandlers, KafkaConfig}
import org.apache.log4j.{Level, Logger}
@@ -216,16 +216,25 @@
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
- var produceList: List[ProducerRequest] = Nil
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ var data = new Array[WiredTopic](3)
+ var index = 0
for(topic <- topics) {
val set = new ByteBufferMessageSet(NoCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
- produceList ::= new ProducerRequest(topic, 0, set)
+ var partition_data = new Array[WiredPartition](1)
+ partition_data(0) = new WiredPartition(0,set)
+ data(index) = new WiredTopic(topic,partition_data)
+ index += 1
fetches += new FetchRequest(topic, 0, 0, 10000)
}
- producer.multiSend(produceList.toArray)
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data)
+ producer.send(producerRequest)
+
for (messageSet <- messages.values)
messageSet.getBuffer.rewind
@@ -241,16 +250,25 @@
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
- var produceList: List[ProducerRequest] = Nil
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ var data = new Array[WiredTopic](3)
+ var index = 0
for(topic <- topics) {
val set = new ByteBufferMessageSet(DefaultCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
- produceList ::= new ProducerRequest(topic, 0, set)
+ var partition_data = new Array[WiredPartition](1)
+ partition_data(0) = new WiredPartition(0,set)
+ data(index) = new WiredTopic(topic,partition_data)
+ index += 1
fetches += new FetchRequest(topic, 0, 0, 10000)
}
- producer.multiSend(produceList.toArray)
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data)
+ producer.send(producerRequest)
+
for (messageSet <- messages.values)
messageSet.getBuffer.rewind
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (working copy)
@@ -20,7 +20,7 @@
import scala.collection._
import junit.framework.Assert._
import kafka.common.OffsetOutOfRangeException
-import kafka.api.{ProducerRequest, FetchRequest}
+import kafka.api.{ProducerRequest, FetchRequest, WiredTopic, WiredPartition}
import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
@@ -132,16 +132,25 @@
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
- var produceList: List[ProducerRequest] = Nil
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ var data = new Array[WiredTopic](3)
+ var index = 0
for(topic <- topics) {
val set = new ByteBufferMessageSet(NoCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
- produceList ::= new ProducerRequest(topic, 0, set)
+ var partition_data = new Array[WiredPartition](1)
+ partition_data(0) = new WiredPartition(0,set)
+ data(index) = new WiredTopic(topic,partition_data)
+ index += 1
fetches += new FetchRequest(topic, 0, 0, 10000)
}
- producer.multiSend(produceList.toArray)
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data)
+ producer.send(producerRequest)
+
for (messageSet <- messages.values)
messageSet.getBuffer.rewind
@@ -157,18 +166,27 @@
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
- var produceList: List[ProducerRequest] = Nil
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ var data = new Array[WiredTopic](3)
+ var index = 0
for(topic <- topics) {
val set = new ByteBufferMessageSet(NoCompressionCodec,
new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
messages += topic -> set
- produceList ::= new ProducerRequest(topic, 0, set)
+ var partition_data = new Array[WiredPartition](1)
+ partition_data(0) = new WiredPartition(0,set)
+ data(index) = new WiredTopic(topic,partition_data)
+ index += 1
fetches += new FetchRequest(topic, 0, 0, 10000)
}
- producer.multiSend(produceList.toArray)
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data)
+ producer.send(producerRequest)
+
// resend the same multisend
- producer.multiSend(produceList.toArray)
+ producer.send(producerRequest)
for (messageSet <- messages.values)
messageSet.getBuffer.rewind
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy)
@@ -77,11 +77,6 @@
val secondEnd = SystemTime.milliseconds
Assert.assertTrue((secondEnd-secondStart) < 500)
- try {
- producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))))
- }catch {
- case e: Exception => failed=true
- }
Assert.assertFalse(failed)
}
Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy)
@@ -27,6 +27,7 @@
import kafka.producer.async._
import kafka.serializer.Encoder
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.utils.TestUtils
class AsyncProducerTest extends JUnitSuite {
@@ -42,8 +43,8 @@
@Test
def testProducerQueueSize() {
val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 10)))))
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 10)))
EasyMock.expectLastCall
basicProducer.close
EasyMock.expectLastCall
@@ -80,8 +81,8 @@
@Test
def testAddAfterQueueClosed() {
val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 10)))))
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 10)))
EasyMock.expectLastCall
basicProducer.close
EasyMock.expectLastCall
@@ -114,11 +115,11 @@
@Test
def testBatchSize() {
val basicProducer = EasyMock.createStrictMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 5)))))
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 5)))
EasyMock.expectLastCall.times(2)
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 1)))))
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 1)))
EasyMock.expectLastCall
basicProducer.close
EasyMock.expectLastCall
@@ -155,8 +156,8 @@
@Test
def testQueueTimeExpired() {
val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 3)))))
+ basicProducer.send(TestUtils.produceWiredRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 3)))
EasyMock.expectLastCall
basicProducer.close
EasyMock.expectLastCall
@@ -211,10 +212,11 @@
@Test
def testCollateEvents() {
val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, ProducerRequest.RandomPartition,
+/*TODO: KAFKA-240 basicProducer.send(TestUtils.produceWiredRequest(topic2, ProducerRequest.RandomPartition,
getMessageSetOfSize(List(message2), 5)),
new ProducerRequest(topic1, ProducerRequest.RandomPartition,
getMessageSetOfSize(List(message1), 5)))))
+*/
EasyMock.expectLastCall
basicProducer.close
EasyMock.expectLastCall
@@ -246,7 +248,7 @@
@Test
def testCollateAndSerializeEvents() {
val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1,
+/*TODO: KAFKA-240 basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1,
getMessageSetOfSize(List(message2), 5)),
new ProducerRequest(topic1, 0,
getMessageSetOfSize(List(message1), 5)),
@@ -254,7 +256,7 @@
getMessageSetOfSize(List(message1), 5)),
new ProducerRequest(topic2, 0,
getMessageSetOfSize(List(message2), 5)))))
-
+*/
EasyMock.expectLastCall
basicProducer.close
EasyMock.expectLastCall
@@ -304,8 +306,5 @@
override def send(topic: String, messages: ByteBufferMessageSet): Unit = {
Thread.sleep(1000)
}
- override def multiSend(produces: Array[ProducerRequest]) {
- Thread.sleep(1000)
- }
}
}
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy)
@@ -241,7 +241,8 @@
props.put("serializer.class", "kafka.producer.StringSerializer")
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+ val correlation_id = -1
+ producerPool.send(correlation_id, producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
producerPool.close
EasyMock.verify(syncProducer1)
@@ -273,7 +274,8 @@
props.put("producer.type", "async")
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+ val correlation_id = -1
+ producerPool.send(correlation_id, producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
producerPool.close
EasyMock.verify(asyncProducer1)
@@ -299,7 +301,8 @@
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
try {
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+ val correlation_id = -1
+ producerPool.send(correlation_id, producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
Assert.fail("Should fail with UnavailableProducerException")
}catch {
case e: UnavailableProducerException => // expected
@@ -330,7 +333,8 @@
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
try {
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+ val correlation_id = -1
+ producerPool.send(correlation_id, producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
Assert.fail("Should fail with UnavailableProducerException")
}catch {
case e: UnavailableProducerException => // expected
Index: core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (working copy)
@@ -27,6 +27,8 @@
import kafka.javaapi.ProducerRequest
import kafka.utils.TestUtils
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
+import kafka.api.{WiredTopic, WiredPartition}
+import kafka.javaapi.Implicits._
/**
* End to end tests of the primitive apis against a local server
@@ -342,16 +344,24 @@
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
- var produceList: List[ProducerRequest] = Nil
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ var data = new Array[WiredTopic](3)
+ var index = 0
for(topic <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
- produceList ::= new ProducerRequest(topic, 0, set)
+ var partition_data = new Array[WiredPartition](1)
+ partition_data(0) = new WiredPartition(0,set)
+ data(index) = new WiredTopic(topic,partition_data)
+ index += 1
fetches += new FetchRequest(topic, 0, 0, 10000)
}
- producer.multiSend(produceList.toArray)
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data)
+ producer.send(producerRequest)
for (messageSet <- messages.values)
messageSet.getBuffer.rewind
@@ -375,16 +385,24 @@
val topics = List("test1", "test2", "test3");
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
val fetches = new mutable.ArrayBuffer[FetchRequest]
- var produceList: List[ProducerRequest] = Nil
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ var data = new Array[WiredTopic](3)
+ var index = 0
for(topic <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
- produceList ::= new ProducerRequest(topic, 0, set)
+ var partition_data = new Array[WiredPartition](1)
+ partition_data(0) = new WiredPartition(0,set)
+ data(index) = new WiredTopic(topic,partition_data)
+ index += 1
fetches += new FetchRequest(topic, 0, 0, 10000)
}
- producer.multiSend(produceList.toArray)
+ val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data)
+ producer.send(producerRequest)
for (messageSet <- messages.values)
messageSet.getBuffer.rewind
Index: core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (working copy)
@@ -80,13 +80,6 @@
val secondEnd = SystemTime.milliseconds
Assert.assertTrue((secondEnd-secondEnd) < 500)
- try {
- producer.multiSend(Array(new ProducerRequest("test", 0,
- new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = getMessageList(new Message(messageBytes))))))
- }catch {
- case e: Exception => failed=true
- }
Assert.assertFalse(failed)
}
Index: core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (revision 1234442)
+++ core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (working copy)
@@ -235,7 +235,8 @@
props.put("serializer.class", "kafka.producer.StringSerializer")
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+ val correlation_id = -1
+ producerPool.send(correlation_id, producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
producerPool.close
EasyMock.verify(syncProducer1)
@@ -267,7 +268,8 @@
props.put("producer.type", "async")
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+ val correlation_id = -1
+ producerPool.send(correlation_id, producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
producerPool.close
EasyMock.verify(asyncProducer1)
@@ -293,7 +295,8 @@
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
try {
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+ val correlation_id = -1
+ producerPool.send(correlation_id,producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
Assert.fail("Should fail with UnavailableProducerException")
}catch {
case e: UnavailableProducerException => // expected
@@ -324,7 +327,8 @@
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
try {
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+ val correlation_id = -1
+ producerPool.send(correlation_id,producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
Assert.fail("Should fail with UnavailableProducerException")
}catch {
case e: UnavailableProducerException => // expected
Index: core/src/main/scala/kafka/producer/ProducerPool.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerPool.scala (revision 1234442)
+++ core/src/main/scala/kafka/producer/ProducerPool.scala (working copy)
@@ -26,6 +26,8 @@
import kafka.common.{UnavailableProducerException, InvalidConfigException}
import kafka.utils.{Utils, Logging}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
+import collection.mutable.{HashMap, ListBuffer}
+import kafka.api.{WiredTopic, WiredPartition}
class ProducerPool[V](private val config: ProducerConfig,
private val serializer: Encoder[V],
@@ -94,7 +96,7 @@
* producer to publish the data to the specified broker partition
* @param poolData the producer pool request object
*/
- def send(poolData: ProducerPoolData[V]*) {
+ def send(correlation_id: Int, poolData: ProducerPoolData[V]*) {
val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct
var remainingRequests = poolData.toSeq
distinctBrokers.foreach { bid =>
@@ -102,18 +104,20 @@
remainingRequests = requestsForThisBid._2
if(sync) {
- val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId,
- new ByteBufferMessageSet(compressionCodec = config.compressionCodec,
- messages = req.getData.map(d => serializer.toMessage(d)): _*)))
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val topics = new HashMap[String, ListBuffer[WiredPartition]]()
+ requestsForThisBid._1.map(req => {
+ topics(req.getTopic).append(new WiredPartition(req.getBidPid.partId, new ByteBufferMessageSet(compressionCodec = config.compressionCodec,
+ messages = req.getData.map(d => serializer.toMessage(d)): _*)))
+ })
+ val wired_topics = topics.map(kv => new WiredTopic(kv._1,kv._2.toArray))
+ val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, wired_topics.toArray)
debug("Fetching sync producer for broker id: " + bid)
val producer = syncProducers.get(bid)
- if(producer != null) {
- if(producerRequests.size > 1)
- producer.multiSend(producerRequests.toArray)
- else
- producer.send(topic = producerRequests(0).topic,
- partition = producerRequests(0).partition,
- messages = producerRequests(0).messages)
+ if(producer != null) {
+ producer.send(producerRequest)
config.compressionCodec match {
case NoCompressionCodec => debug("Sending message to broker " + bid)
case _ => debug("Sending compressed messages to broker " + bid)
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1234442)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy)
@@ -50,7 +50,8 @@
if (logger.isTraceEnabled) {
trace("verifying sendbuffer of size " + buffer.limit)
val requestTypeId = buffer.getShort()
- if (requestTypeId == RequestKeys.MultiProduce) {
+ //TODO: KAFKA-240 need to understand purpose of this
+ /* if (requestTypeId == RequestKeys.MultiProduce) {
try {
val request = MultiProducerRequest.readFrom(buffer)
for (produce <- request.produces) {
@@ -69,7 +70,7 @@
case e: Throwable =>
trace("error verifying sendbuffer ", e)
}
- }
+ }*/
}
}
@@ -107,15 +108,37 @@
/**
* Send a message
*/
+ def send(producerRequest: kafka.javaapi.ProducerRequest) {
+ producerRequest.data.foreach(d => {
+ d.partition_data.foreach(p => {
+ verifyMessageSize(p.messages)
+ val setSize = p.messages.sizeInBytes.asInstanceOf[Int]
+ trace("Got message set with " + setSize + " bytes to send")
+ })
+ })
+ send(new BoundedByteBufferSend(producerRequest))
+ }
+
def send(topic: String, partition: Int, messages: ByteBufferMessageSet) {
- verifyMessageSize(messages)
- val setSize = messages.sizeInBytes.asInstanceOf[Int]
- trace("Got message set with " + setSize + " bytes to send")
- send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages)))
+ send(-1,topic,partition,messages)
}
-
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages)
+ def send(topic: String, messages: ByteBufferMessageSet) {
+ send(-1,topic,ProducerRequest.RandomPartition,messages)
+ }
+
+ def send(correlation_id: Int, topic: String, partition: Int, messages: ByteBufferMessageSet) {
+ val client_id = "TODO" //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val required_acks: Short = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ val ack_timeout = 0 //TODO: KAFKA-240 make this come from configuration file or some other object that it can grab it from
+ var data = new Array[WiredTopic](1)
+ var partition_data = new Array[WiredPartition](1)
+ partition_data(0) = new WiredPartition(partition,messages)
+ data(0) = new WiredTopic(topic,partition_data)
+ val producerRequest = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data)
+ send(producerRequest)
+ }
+/*TODO: delete
def multiSend(produces: Array[ProducerRequest]) {
for (request <- produces)
verifyMessageSize(request.messages)
@@ -123,7 +146,7 @@
trace("Got multi message sets with " + setSize + " bytes to send")
send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
}
-
+*/
def close() = {
lock synchronized {
disconnect()
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala (revision 1234442)
+++ core/src/main/scala/kafka/producer/Producer.scala (working copy)
@@ -93,6 +93,8 @@
this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
+ var correlation_id: Int = 0 //allows the client to send request specific information through the producer
+
/**
* Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer
@@ -138,7 +140,7 @@
new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId),
pd.getData)
}
- producerPool.send(producerPoolRequests: _*)
+ producerPool.send(correlation_id, producerPoolRequests: _*)
}
private def configSend(producerData: ProducerData[K,V]*) {
@@ -160,7 +162,7 @@
new Partition(brokerIdPartition.brokerId, partition),
pd.getData)
}
- producerPool.send(producerPoolRequests: _*)
+ producerPool.send(correlation_id, producerPoolRequests: _*)
}
private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1234442)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy)
@@ -46,8 +46,9 @@
private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) {
if(messagesPerTopic.size > 0) {
- val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
- syncProducer.multiSend(requests)
+ //TODO KAFKA-240
+ //val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray
+ //syncProducer.multiSend(requests)
trace("kafka producer sent messages for topics %s to broker %s:%d"
.format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port))
}
Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1234442)
+++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy)
@@ -32,8 +32,6 @@
*/
private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Logging {
- private val requestLogger = Logger.getLogger("kafka.request.logger")
-
def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = {
requestTypeId match {
case RequestKeys.Produce => handleProducerRequest _
@@ -48,55 +46,59 @@
def handleProducerRequest(receive: Receive): Option[Send] = {
val sTime = SystemTime.milliseconds
val request = ProducerRequest.readFrom(receive.buffer)
-
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Producer request " + request.toString)
- handleProducerRequest(request, "ProduceRequest")
+ trace("Producer request " + request.toString)
debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
- None
+ Some(handleProducerRequest(request, "ProduceRequest"))
}
def handleMultiProducerRequest(receive: Receive): Option[Send] = {
val request = MultiProducerRequest.readFrom(receive.buffer)
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Multiproducer request " + request.toString)
+ trace("Multiproducer request " + request.toString)
request.produces.map(handleProducerRequest(_, "MultiProducerRequest"))
None
}
- private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = {
- val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
- try {
- logManager.getOrCreateLog(request.topic, partition).append(request.messages)
- trace(request.messages.sizeInBytes + " bytes written to logs.")
- request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
- }
- catch {
- case e =>
- error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
- e match {
- case _: IOException =>
- fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
- Runtime.getRuntime.halt(1)
- case _ =>
+ private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): ProducerResponse = {
+ val requestSize = request.data.size
+ val errors = new Array[Int](requestSize)
+ val offsets = new Array[Long](requestSize)
+
+ request.data.foreach(d => {
+ d.partition_data.foreach(p => {
+ val partition = p.getTranslatedPartition(d.topic, logManager.chooseRandomPartition)
+ try {
+ logManager.getOrCreateLog(d.topic, partition).append(p.messages)
+ trace(p.messages.sizeInBytes + " bytes written to logs.")
+ p.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
}
- throw e
- }
- None
+ catch {
+ case e =>
+ //TODO: handle response in ProducerResponse
+ error("Error processing " + requestHandlerName + " on " + d.topic + ":" + partition, e)
+ e match {
+ case _: IOException =>
+ fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
+ Runtime.getRuntime.halt(1)
+ case _ =>
+ }
+ //throw e
+ }
+ })
+ //None
+ })
+ new ProducerResponse(request.correlation_id, request.version_id, errors, offsets)
}
def handleFetchRequest(request: Receive): Option[Send] = {
val fetchRequest = FetchRequest.readFrom(request.buffer)
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Fetch request " + fetchRequest.toString)
+ trace("Fetch request " + fetchRequest.toString)
Some(readMessageSet(fetchRequest))
}
def handleMultiFetchRequest(request: Receive): Option[Send] = {
val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer)
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Multifetch request")
- multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString))
+ trace("Multifetch request")
+ multiFetchRequest.fetches.foreach(req => trace(req.toString))
var responses = multiFetchRequest.fetches.map(fetch =>
readMessageSet(fetch)).toList
@@ -123,8 +125,7 @@
def handleOffsetRequest(request: Receive): Option[Send] = {
val offsetRequest = OffsetRequest.readFrom(request.buffer)
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Offset request " + offsetRequest.toString)
+ trace("Offset request " + offsetRequest.toString)
val offsets = logManager.getOffsets(offsetRequest)
val response = new OffsetArraySend(offsets)
Some(response)
Index: core/src/main/scala/kafka/api/ProducerRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1234442)
+++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy)
@@ -22,47 +22,112 @@
import kafka.network._
import kafka.utils._
+object WiredFormat {
+ val version_id: Short = 0
+}
+
+class WiredTopic(val topic: String, val partition_data: Array[WiredPartition])
+
+class WiredPartition(val partition: Int, val messages: ByteBufferMessageSet) {
+ def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = {
+ if (partition == ProducerRequest.RandomPartition)
+ return randomSelector(topic)
+ else
+ return partition
+ }
+}
+
object ProducerRequest {
val RandomPartition = -1
-
+
def readFrom(buffer: ByteBuffer): ProducerRequest = {
- val topic = Utils.readShortString(buffer, "UTF-8")
- val partition = buffer.getInt
- val messageSetSize = buffer.getInt
- val messageSetBuffer = buffer.slice()
- messageSetBuffer.limit(messageSetSize)
- buffer.position(buffer.position + messageSetSize)
- new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer))
+ val version_id: Short = buffer.getShort
+ val correlation_id: Int = buffer.getInt
+ val client_id: String = Utils.readShortString(buffer, "UTF-8")
+ val required_acks: Short = buffer.getShort
+ val ack_timeout: Int = buffer.getInt
+ //build the topic structure
+ val topicCount = buffer.getInt
+ val data = new Array[WiredTopic](topicCount)
+ for(i <- 0 until topicCount) {
+ val topic = Utils.readShortString(buffer, "UTF-8")
+
+ val partitionCount = buffer.getInt
+ //build the partition structure within this topic
+ val partition_data = new Array[WiredPartition](partitionCount)
+ for (j <- 0 until partitionCount) {
+ val partition = buffer.getInt
+ val messageSetSize = buffer.getInt
+ val messageSetBuffer = new Array[Byte](messageSetSize)
+ buffer.get(messageSetBuffer,0,messageSetSize)
+ partition_data(j) = new WiredPartition(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
+ }
+ data(i) = new WiredTopic(topic,partition_data)
+ }
+ new ProducerRequest(correlation_id,client_id,required_acks,ack_timeout,data)
}
}
-class ProducerRequest(val topic: String,
- val partition: Int,
- val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+class ProducerRequest(val correlation_id: Int,
+ val client_id: String,
+ val required_acks: Short,
+ val ack_timeout: Int,
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) {
+ val version_id: Short = WiredFormat.version_id
+
def writeTo(buffer: ByteBuffer) {
- Utils.writeShortString(buffer, topic, "UTF-8")
- buffer.putInt(partition)
- buffer.putInt(messages.serialized.limit)
- buffer.put(messages.serialized)
- messages.serialized.rewind
+ buffer.putShort(version_id)
+ buffer.putInt(correlation_id)
+ Utils.writeShortString(buffer, client_id, "UTF-8")
+ buffer.putShort(required_acks)
+ buffer.putInt(ack_timeout)
+ //save the topic structure
+ buffer.putInt(data.size) //the number of topics
+ data.foreach(d =>{
+ Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic
+ buffer.putInt(d.partition_data.size) //the number of partitions
+ d.partition_data.foreach(p => {
+ buffer.putInt(p.partition)
+ buffer.putInt(p.messages.serialized.limit)
+ buffer.put(p.messages.serialized)
+ p.messages.serialized.rewind
+ })
+ })
}
-
- def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int]
- def getTranslatedPartition(randomSelector: String => Int): Int = {
- if (partition == ProducerRequest.RandomPartition)
- return randomSelector(topic)
- else
- return partition
+ def sizeInBytes(): Int = {
+ var size = 0
+ //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
+ size = 2 + 4 + 2 + client_id.length + 2 + 4 + 4;
+ data.foreach(d =>{
+ size += 2 + d.topic.length + 4
+ d.partition_data.foreach(p => {
+ size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int]
+ })
+ })
+ size
}
+
override def toString: String = {
val builder = new StringBuilder()
builder.append("ProducerRequest(")
- builder.append(topic + ",")
- builder.append(partition + ",")
- builder.append(messages.sizeInBytes)
+ builder.append(version_id + ",")
+ builder.append(correlation_id + ",")
+ builder.append(client_id + ",")
+ builder.append(required_acks + ",")
+ builder.append(ack_timeout)
+ data.foreach(d =>{
+ builder.append(":[" + d.topic)
+ d.partition_data.foreach(p => {
+ builder.append(":[")
+ builder.append(p.partition + ",")
+ builder.append(p.messages.sizeInBytes)
+ builder.append("]")
+ })
+ builder.append("]")
+ })
builder.append(")")
builder.toString
}
@@ -70,14 +135,36 @@
override def equals(other: Any): Boolean = {
other match {
case that: ProducerRequest =>
- (that canEqual this) && topic == that.topic && partition == that.partition &&
- messages.equals(that.messages)
+ if (that canEqual this)
+ if (version_id == that.version_id && correlation_id == that.correlation_id &&
+ client_id == that.client_id && required_acks == that.required_acks && ack_timeout == that.ack_timeout) {
+ for(i <- 0 until data.size) {
+ if (data(i).topic != that.data(i).topic)
+ return false
+ for(j <- 0 until data(i).partition_data.size)
+ if (data(i).partition_data(j).partition != that.data(i).partition_data(j).partition || !data(i).partition_data(j).messages.equals(that.data(i).partition_data(j).messages))
+ return false
+ }
+ true
+ }
+ false
case _ => false
}
}
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
-
+ override def hashCode: Int = {
+ def hcp(num: Int): Int = {
+ 31 + (17 * num)
+ }
+ var hash = hcp(version_id) + hcp(correlation_id) + client_id.hashCode + hcp(required_acks) + hcp(ack_timeout)
+ data.foreach(d =>{
+ hash += d.topic.hashCode
+ d.partition_data.foreach(p => {
+ hash += hcp(p.partition) + p.messages.hashCode
+ })
+ })
+ hash
+ }
}
Index: core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (revision 1234442)
+++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (working copy)
@@ -30,18 +30,17 @@
underlying.send(topic, partition, messages)
}
- def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic,
- kafka.api.ProducerRequest.RandomPartition,
- messages)
+ def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, kafka.api.ProducerRequest.RandomPartition, messages)
+/*TODO: delete
def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) {
import kafka.javaapi.Implicits._
val produceRequests = new Array[kafka.api.ProducerRequest](produces.length)
for(i <- 0 until produces.length)
- produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages)
+ produceRequests(i) = produces(i).api()
underlying.multiSend(produceRequests)
}
-
+*/
def close() {
underlying.close
}
Index: core/src/main/scala/kafka/javaapi/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/producer/Producer.scala (revision 1234442)
+++ core/src/main/scala/kafka/javaapi/producer/Producer.scala (working copy)
@@ -99,8 +99,11 @@
* synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data
*/
+ var correlation_id: Int = 0
+
def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) {
import collection.JavaConversions._
+ underlying.correlation_id = correlation_id
underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey,
asBuffer(producerData.getData)))
}
@@ -111,6 +114,7 @@
*/
def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) {
import collection.JavaConversions._
+ underlying.correlation_id = correlation_id
underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
asBuffer(pd.getData))): _*)
}
Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1234442)
+++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy)
@@ -17,36 +17,31 @@
package kafka.javaapi
import kafka.network.Request
-import kafka.api.RequestKeys
+import kafka.api.{RequestKeys, WiredTopic}
import java.nio.ByteBuffer
-class ProducerRequest(val topic: String,
- val partition: Int,
- val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
+class ProducerRequest(val correlation_id: Int,
+ val client_id: String,
+ val required_acks: Short,
+ val ack_timeout: Int,
+ val data: Array[WiredTopic]) extends Request(RequestKeys.Produce) {
+
import Implicits._
- private val underlying = new kafka.api.ProducerRequest(topic, partition, messages)
+ private val underlying = new kafka.api.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data)
+ def api(): kafka.api.ProducerRequest = underlying
+
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
def sizeInBytes(): Int = underlying.sizeInBytes
- def getTranslatedPartition(randomSelector: String => Int): Int =
- underlying.getTranslatedPartition(randomSelector)
-
override def toString: String =
underlying.toString
- override def equals(other: Any): Boolean = {
- other match {
- case that: ProducerRequest =>
- (that canEqual this) && topic == that.topic && partition == that.partition &&
- messages.equals(that.messages)
- case _ => false
- }
- }
+ override def equals(other: Any): Boolean = underlying.equals(other)
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
- override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
+ override def hashCode: Int = underlying.hashCode
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment