Skip to content

Instantly share code, notes, and snippets.

@jjkoshy
jjkoshy / gist:3718883
Created September 14, 2012 00:18
Topic partition tuples in kafka 0.8
git grep -n "(String, Int)" -- *.scala
core/src/main/scala/kafka/api/LeaderAndISRRequest.scala:78: val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
core/src/main/scala/kafka/api/LeaderAndISRRequest.scala:96: leaderAndISRInfos: Map[(String, Int), LeaderAndISR])
core/src/main/scala/kafka/api/LeaderAndISRRequest.scala:98: def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
core/src/main/scala/kafka/api/LeaderAndISRResponse.scala:32: val responseMap = new HashMap[(String, Int), Short]()
core/src/main/scala/kafka/api/LeaderAndISRResponse.scala:45: responseMap: Map[(String, Int), Short],
core/src/main/scala/kafka/api/LeaderAndISRResponse.scala:60: for ((key:(String, Int), value) <- responseMap){
core/src/main/scala/kafka/api/StopReplicaRequest.scala:36: val topicPartitionPairSet = new HashSet[(String, Int)]()
@jjkoshy
jjkoshy / gist:3830723
Created October 4, 2012 00:13
metrics csv reporter output after producing to two topics
[1711][jkoshy@jkoshy-ld:~/kafka_metrics]$ ls
ActiveControllerCount.csv ISRShrinksPerSec.csv Produce-RequestsPerSec.csv
AllTopicsBytesInPerSec.csv LeaderAndIsr-LocalTimeNs.csv Produce-ResponseSendTimeNs.csv
AllTopicsBytesOutPerSec.csv LeaderAndIsr-QueueTimeNs.csv RequestQueueSize.csv
AllTopicsFailedFetchRequestsPerSec.csv LeaderAndIsr-RemoteTimeNs.csv StopReplica-LocalTimeNs.csv
AllTopicsFailedProduceRequestsPerSec.csv LeaderAndIsr-RequestsPerSec.csv StopReplica-QueueTimeNs.csv
AllTopicsMessagesInPerSec.csv LeaderAndIsr-ResponseSendTimeNs.csv StopReplica-RemoteTimeNs.csv
ConsumerExpiresPerSecond.csv LeaderAndIsr-TotalTimeNs.csv StopReplica-RequestsPerSec.csv
Fetch-Consumer-LocalTimeNs.csv LeaderCount.csv StopReplica-ResponseSendTimeNs.csv
Fetch-Consumer-QueueTimeNs.csv LogFlushRateAndTimeMs.csv StopReplica-TotalTimeNs.csv
@jjkoshy
jjkoshy / gist:3842975
Created October 5, 2012 23:04
zkserializer
client.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object o)
throws ZkMarshallingError
{
return ZKStringSerializer.serialize(o);
}
@Override
public Object deserialize(byte[] bytes)
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c793110..d9bd99f 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -22,24 +22,15 @@ import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
import kafka.common.KafkaException
+// RR TODO: we should get rid of this topiccount trait since it only makes sense for static topic count
+// RR TODO: and rename this to SubscriptionConfig
diff --git a/config/log4j.properties b/config/log4j.properties
index baa698b..9502254 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -41,7 +41,7 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
-log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
for i in {1..50}; do ./bin/kafka-topics.sh --create --topic test$i --zookeeper localhost:2181 --partitions 4 --replication-factor 2; done
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
@jjkoshy
jjkoshy / DeleteInvalidOffsets.scala
Last active August 29, 2015 14:09
Tool to fix offset manager movement caused by KAFKA-1469
package kafka.tools
import joptsimple.OptionParser
import kafka.api._
import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer
import kafka.consumer.ConsumerConfig
import kafka.network.BlockingChannel
import kafka.utils.{Logging, CommandLineUtils}
Linux x86_64, Debug, clang 3.9, libc++ . Options: brpc:shared-False
Linux x86_64, Debug, clang 3.9, libc++ . Options: brpc:shared-True
Linux x86_64, Debug, clang 3.9, libstdc++ . Options: brpc:shared-False
Linux x86_64, Debug, clang 3.9, libstdc++ . Options: brpc:shared-True
Linux x86_64, Debug, clang 4.0, libc++ . Options: brpc:shared-False
Linux x86_64, Debug, clang 4.0, libc++ . Options: brpc:shared-True
Linux x86_64, Debug, clang 4.0, libstdc++ . Options: brpc:shared-False
Linux x86_64, Debug, clang 4.0, libstdc++ . Options: brpc:shared-True
Linux x86_64, Debug, clang 5.0, libc++ . Options: brpc:shared-False
Linux x86_64, Debug, clang 5.0, libc++ . Options: brpc:shared-True