Skip to content

Instantly share code, notes, and snippets.

@alq666
Created September 29, 2015 22:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alq666/1796511f5caff0743561 to your computer and use it in GitHub Desktop.
Save alq666/1796511f5caff0743561 to your computer and use it in GitHub Desktop.
From 1814e7c904072f0f67c5128d53a20d26ebb56b1a Mon Sep 17 00:00:00 2001
From: Jun Rao <junrao@gmail.com>
Date: Tue, 12 May 2015 15:37:21 -0700
Subject: [PATCH 1/3] synchronize on getting size from watchers
---
core/src/main/scala/kafka/server/RequestPurgatory.scala | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 87ee3be..098679c 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -196,7 +196,11 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
private val requests = new util.LinkedList[T]
// return the size of the watch list
- def watched() = requests.size()
+ def watched() = {
+ synchronized {
+ requests.size()
+ }
+ }
// add the element to the watcher list if it's not already satisfied
def addIfNotSatisfied(t: T): Boolean = {
--
1.8.5.2 (Apple Git-48)
From ef381eaefea768eb95da279d1bf197aeab27a66b Mon Sep 17 00:00:00 2001
From: Jun Rao <junrao@gmail.com>
Date: Thu, 14 May 2015 09:04:38 -0700
Subject: [PATCH 2/3] add instrumentation
---
core/src/main/scala/kafka/server/FetchRequestPurgatory.scala | 2 +-
core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala | 2 +-
core/src/main/scala/kafka/server/RequestPurgatory.scala | 6 +++++-
3 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
index ed13188..5a20233 100644
--- a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit
* The purgatory holding delayed fetch requests
*/
class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel)
- extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) {
+ extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests, "FetchPurgatory") {
this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
index e7ff411..852841a 100644
--- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit
* The purgatory holding delayed producer requests
*/
class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel)
- extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) {
+ extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests, "ProducePurgatory") {
this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
private class DelayedProducerRequestMetrics(metricId: Option[TopicAndPartition]) extends KafkaMetricsGroup {
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 098679c..701583e 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -65,7 +65,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
-abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000)
+abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000, purgatoryName: String = "")
extends Logging with KafkaMetricsGroup {
/* a list of requests watching each key */
@@ -280,6 +280,10 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
expire(curr)
}
}
+
+ debug("Checking for purging in %s; watched items: %d, purge threshold: %d".
+ format(purgatoryName, RequestPurgatory.this.watched(), purgeInterval))
+
// see if we need to purge the watch lists
if (RequestPurgatory.this.watched() >= purgeInterval) {
debug("Begin purging watch lists")
--
1.8.5.2 (Apple Git-48)
From 62f5112a398f58975ad4c163fcbd08af9e852cd0 Mon Sep 17 00:00:00 2001
From: Jun Rao <junrao@gmail.com>
Date: Fri, 15 May 2015 15:52:27 -0700
Subject: [PATCH 3/3] return from pollExpired() on expired items
---
core/src/main/scala/kafka/server/RequestPurgatory.scala | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 701583e..9fa00e8 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -286,15 +286,15 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
// see if we need to purge the watch lists
if (RequestPurgatory.this.watched() >= purgeInterval) {
- debug("Begin purging watch lists")
+ debug("Begin purging watch lists in %s".format(purgatoryName))
val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
- debug("Purged %d elements from watch lists.".format(numPurgedFromWatchers))
+ debug("Purged %d elements from watch lists in %s.".format(numPurgedFromWatchers, purgatoryName))
}
// see if we need to purge the delayed request queue
if (delayed() >= purgeInterval) {
- debug("Begin purging delayed queue")
+ debug("Begin purging delayed queue in %s".format(purgatoryName))
val purged = purgeSatisfied()
- debug("Purged %d requests from delayed queue.".format(purged))
+ debug("Purged %d requests from delayed queue %s.".format(purged, purgatoryName))
}
} catch {
case e: Exception =>
@@ -326,9 +326,10 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt
if (curr == null)
return null.asInstanceOf[T]
val updated = curr.satisfied.compareAndSet(false, true)
- if(updated) {
+ if(updated)
return curr
- }
+ else
+ return null.asInstanceOf[T]
}
throw new RuntimeException("This should not happen")
}
--
1.8.5.2 (Apple Git-48)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment