Topic partition tuples in kafka 0.8
git grep -n "(String, Int)" -- *.scala
core/src/main/scala/kafka/api/LeaderAndISRRequest.scala:78: val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
core/src/main/scala/kafka/api/LeaderAndISRRequest.scala:96: leaderAndISRInfos: Map[(String, Int), LeaderAndISR])
core/src/main/scala/kafka/api/LeaderAndISRRequest.scala:98: def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
core/src/main/scala/kafka/api/LeaderAndISRResponse.scala:32: val responseMap = new HashMap[(String, Int), Short]()
core/src/main/scala/kafka/api/LeaderAndISRResponse.scala:45: responseMap: Map[(String, Int), Short],
core/src/main/scala/kafka/api/LeaderAndISRResponse.scala:60: for ((key:(String, Int), value) <- responseMap){
core/src/main/scala/kafka/api/StopReplicaRequest.scala:36: val topicPartitionPairSet = new HashSet[(String, Int)]()
metrics csv reporter output after producing to two topics
[1711][jkoshy@jkoshy-ld:~/kafka_metrics]$ ls
ActiveControllerCount.csv ISRShrinksPerSec.csv Produce-RequestsPerSec.csv
AllTopicsBytesInPerSec.csv LeaderAndIsr-LocalTimeNs.csv Produce-ResponseSendTimeNs.csv
AllTopicsBytesOutPerSec.csv LeaderAndIsr-QueueTimeNs.csv RequestQueueSize.csv
AllTopicsFailedFetchRequestsPerSec.csv LeaderAndIsr-RemoteTimeNs.csv StopReplica-LocalTimeNs.csv
AllTopicsFailedProduceRequestsPerSec.csv LeaderAndIsr-RequestsPerSec.csv StopReplica-QueueTimeNs.csv
AllTopicsMessagesInPerSec.csv LeaderAndIsr-ResponseSendTimeNs.csv StopReplica-RemoteTimeNs.csv
ConsumerExpiresPerSecond.csv LeaderAndIsr-TotalTimeNs.csv StopReplica-RequestsPerSec.csv
Fetch-Consumer-LocalTimeNs.csv LeaderCount.csv StopReplica-ResponseSendTimeNs.csv
Fetch-Consumer-QueueTimeNs.csv LogFlushRateAndTimeMs.csv StopReplica-TotalTimeNs.csv
client.setZkSerializer(new ZkSerializer() {
public byte[] serialize(Object o)
throws ZkMarshallingError
return ZKStringSerializer.serialize(o);
public Object deserialize(byte[] bytes)
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c793110..d9bd99f 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -22,24 +22,15 @@ import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
import kafka.common.KafkaException
+// RR TODO: we should get rid of this topiccount trait since it only makes sense for static topic count
+// RR TODO: and rename this to SubscriptionConfig
diff --git a/config/ b/config/
index baa698b..9502254 100644
--- a/config/
+++ b/config/
@@ -41,7 +41,7 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
for i in {1..50}; do ./bin/ --create --topic test$i --zookeeper localhost:2181 --partitions 4 --replication-factor 2; done
Tool to fix offset manager movement caused by KAFKA-1469
import joptsimple.OptionParser
import kafka.api._
import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer
import kafka.consumer.ConsumerConfig
import kafka.utils.{Logging, CommandLineUtils}
